diff --git a/p4/src/main/java/cs448/Project4.java b/p4/src/main/java/cs448/Project4.java index 79b533e..8b3bb52 100644 --- a/p4/src/main/java/cs448/Project4.java +++ b/p4/src/main/java/cs448/Project4.java @@ -123,14 +123,15 @@ public class Project4 { JavaRDD filteredRatings = ratingRDD.filter(r -> r.getRating() == conf.q3Rating); JavaPairRDD filteredRatingsPair = filteredRatings.mapToPair(r -> new Tuple2(r.getUserId(), r)); - JavaPairRDD filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m)); - JavaPairRDD> usersRatingsJoin = filteredUsersPair.join(filteredRatingsPair); + JavaPairRDD filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m)); + // Map the previous RDD to JavaPairRDD movieMatchRatings = usersRatingsJoin.mapToPair(t -> new Tuple2(t._2()._2().getMovieId(), t._2()._2())); JavaPairRDD> moviesJoined = movieMatchRatings.join(filteredMoviesPair); - JavaRDD movieIdRDD = moviesJoined.map(t -> t._1().toString()); + + JavaRDD movieIdRDD = moviesJoined.map(t -> t._1().toString()).distinct(); movieIdRDD.collect().forEach(x -> System.out.println(x)); movieIdRDD.saveAsTextFile(CS448Utils.resolveUri(conf.outPath, "query-3"));