From beb3d1cea1ea871f78f17fd8a06b38c2c73a8c04 Mon Sep 17 00:00:00 2001 From: Max O'Cull Date: Sun, 21 Apr 2019 15:08:46 -0400 Subject: [PATCH] Add distinct --- p4/src/main/java/cs448/Project4.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/p4/src/main/java/cs448/Project4.java b/p4/src/main/java/cs448/Project4.java index 79b533e..8b3bb52 100644 --- a/p4/src/main/java/cs448/Project4.java +++ b/p4/src/main/java/cs448/Project4.java @@ -123,14 +123,15 @@ public class Project4 { JavaRDD filteredRatings = ratingRDD.filter(r -> r.getRating() == conf.q3Rating); JavaPairRDD filteredRatingsPair = filteredRatings.mapToPair(r -> new Tuple2(r.getUserId(), r)); - JavaPairRDD filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m)); - JavaPairRDD> usersRatingsJoin = filteredUsersPair.join(filteredRatingsPair); + JavaPairRDD filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m)); + // Map the previous RDD to JavaPairRDD movieMatchRatings = usersRatingsJoin.mapToPair(t -> new Tuple2(t._2()._2().getMovieId(), t._2()._2())); JavaPairRDD> moviesJoined = movieMatchRatings.join(filteredMoviesPair); - JavaRDD movieIdRDD = moviesJoined.map(t -> t._1().toString()); + + JavaRDD movieIdRDD = moviesJoined.map(t -> t._1().toString()).distinct(); movieIdRDD.collect().forEach(x -> System.out.println(x)); movieIdRDD.saveAsTextFile(CS448Utils.resolveUri(conf.outPath, "query-3"));