Add greater than operator for Q1

This commit is contained in:
Max O'Cull 2019-04-21 15:20:17 -04:00
parent beb3d1cea1
commit cc894e8ae3

View File

@ -40,10 +40,27 @@ public class Project4 {
ratingDF.createOrReplaceTempView("Rating");
// Compute the result.
Dataset<Row> resultDF = spark.sql("SELECT DISTINCT m.title FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " +
conf.q1Occupation + " AND r.rating >= " + conf.q1Rating);
resultDF.show();
resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-1"));
// Dataset<Row> resultDF = spark.sql("SELECT DISTINCT m.title FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " +
// conf.q1Occupation + " AND r.rating >= " + conf.q1Rating);
// resultDF.show();
// resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-1"));
JavaRDD<User> filteredUsers = userRDD.filter(u -> u.getOccupation().equals(conf.q1Occupation));
JavaPairRDD<Integer, User> filteredUsersPair = filteredUsers.mapToPair(u -> new Tuple2(u.getUserId(), u));
JavaRDD<Rating> filteredRatings = ratingRDD.filter(r -> r.getRating() >= conf.q1Rating);
JavaPairRDD<Integer, Rating> filteredRatingsPair = filteredRatings.mapToPair(r -> new Tuple2(r.getUserId(), r));
JavaPairRDD<Integer, Tuple2<User, Rating>> usersRatingsJoin = filteredUsersPair.join(filteredRatingsPair);
JavaPairRDD<Integer, Movie> filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m));
// Map the previous RDD to <MovieID, Rating>
JavaPairRDD<Integer, Rating> movieMatchRatings = usersRatingsJoin.mapToPair(t -> new Tuple2(t._2()._2().getMovieId(), t._2()._2()));
JavaPairRDD<Integer, Tuple2<Rating, Movie>> moviesJoined = movieMatchRatings.join(filteredMoviesPair);
JavaRDD<String> movieIdRDD = moviesJoined.map(t -> t._1().toString()).distinct();
movieIdRDD.collect().forEach(x -> System.out.println(x));
movieIdRDD.saveAsTextFile(CS448Utils.resolveUri(conf.outPath, "query-1"));
//Don't forget to stop spark session
spark.stop();