diff --git a/p4/src/main/java/cs448/Project4.java b/p4/src/main/java/cs448/Project4.java index 8b3bb52..ef927ac 100644 --- a/p4/src/main/java/cs448/Project4.java +++ b/p4/src/main/java/cs448/Project4.java @@ -40,10 +40,27 @@ public class Project4 { ratingDF.createOrReplaceTempView("Rating"); // Compute the result. - Dataset resultDF = spark.sql("SELECT DISTINCT m.title FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " + - conf.q1Occupation + " AND r.rating >= " + conf.q1Rating); - resultDF.show(); - resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-1")); + // Dataset resultDF = spark.sql("SELECT DISTINCT m.title FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " + + // conf.q1Occupation + " AND r.rating >= " + conf.q1Rating); + // resultDF.show(); + // resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-1")); + JavaRDD filteredUsers = userRDD.filter(u -> u.getOccupation().equals(conf.q1Occupation)); + JavaPairRDD filteredUsersPair = filteredUsers.mapToPair(u -> new Tuple2(u.getUserId(), u)); + + JavaRDD filteredRatings = ratingRDD.filter(r -> r.getRating() >= conf.q1Rating); + JavaPairRDD filteredRatingsPair = filteredRatings.mapToPair(r -> new Tuple2(r.getUserId(), r)); + + JavaPairRDD> usersRatingsJoin = filteredUsersPair.join(filteredRatingsPair); + + JavaPairRDD filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m)); + + // Map the previous RDD to + JavaPairRDD movieMatchRatings = usersRatingsJoin.mapToPair(t -> new Tuple2(t._2()._2().getMovieId(), t._2()._2())); + JavaPairRDD> moviesJoined = movieMatchRatings.join(filteredMoviesPair); + + JavaRDD movieIdRDD = moviesJoined.map(t -> t._1().toString()).distinct(); + movieIdRDD.collect().forEach(x -> System.out.println(x)); + movieIdRDD.saveAsTextFile(CS448Utils.resolveUri(conf.outPath, "query-1")); //Don't forget to stop spark session spark.stop();