From a1bb4909609a90fc581eacef6db2e78c21a51a11 Mon Sep 17 00:00:00 2001 From: Max O'Cull Date: Sun, 21 Apr 2019 12:31:20 -0400 Subject: [PATCH] Query 3 to RDD --- p4/src/main/java/cs448/Project4.java | 71 ++++++++++++---------------- 1 file changed, 30 insertions(+), 41 deletions(-) diff --git a/p4/src/main/java/cs448/Project4.java b/p4/src/main/java/cs448/Project4.java index 31b91f0..ce60666 100644 --- a/p4/src/main/java/cs448/Project4.java +++ b/p4/src/main/java/cs448/Project4.java @@ -1,26 +1,14 @@ package cs448; -import org.apache.commons.cli.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import scala.Tuple2; -import scala.Tuple4; -import java.io.IOException; -import java.io.Serializable; -import java.util.List; +import scala.Tuple2; public class Project4 { public void runSparkApp1(App.Conf conf){ @@ -33,10 +21,6 @@ public class Project4 { SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 1").getOrCreate(); // Write data processing code here - String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; - Dataset data; - - //// Reading, Parsing and counting lines for each of the data files JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() .javaRDD() .map(User::parseUser); @@ -72,10 +56,6 @@ public class Project4 { SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 2").getOrCreate(); // Write data processing code here - String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; - Dataset data; - - //// Reading, Parsing and counting lines for each of the data files JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() .javaRDD() .map(User::parseUser); @@ -112,32 +92,47 @@ public class Project4 { SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 3").getOrCreate(); // Write data processing code here - String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; - Dataset data; - - //// Reading, Parsing and counting lines for each of the data files JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() .javaRDD() .map(User::parseUser); - Dataset userDF = spark.createDataFrame(userRDD, User.class); - userDF.createOrReplaceTempView("User"); + // Dataset userDF = spark.createDataFrame(userRDD, User.class); + // userDF.createOrReplaceTempView("User"); JavaRDD movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache() .javaRDD() .map(Movie::parseMovie); - Dataset movieDF = spark.createDataFrame(movieRDD, Movie.class); - movieDF.createOrReplaceTempView("Movie"); + // Dataset movieDF = spark.createDataFrame(movieRDD, Movie.class); + // movieDF.createOrReplaceTempView("Movie"); JavaRDD ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache() .javaRDD() .map(Rating::parseRating); - Dataset ratingDF = spark.createDataFrame(ratingRDD, Rating.class); - ratingDF.createOrReplaceTempView("Rating"); + // Dataset ratingDF = spark.createDataFrame(ratingRDD, Rating.class); + // ratingDF.createOrReplaceTempView("Rating"); // Compute the result. - Dataset resultDF = spark.sql("SELECT DISTINCT m.movieId FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " + - conf.q3Occupation + " AND r.rating = " + conf.q3Rating); - resultDF.show(); + // Dataset resultDF = spark.sql("SELECT DISTINCT m.movieId FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " + + // conf.q3Occupation + " AND r.rating = " + conf.q3Rating); + // resultDF.show(); + // Dataset CompressedResult = resultDF.map((MapFunction) row -> "" + row.getInt(0), Encoders.STRING()); + // CompressedResult.show(); + // CompressedResult.write().text(CS448Utils.resolveUri(conf.outPath, "query-3")); + JavaRDD filteredUsers = userRDD.filter(u -> u.getOccupation().equals(conf.q3Occupation)); + JavaPairRDD filteredUsersPair = filteredUsers.mapToPair(u -> new Tuple2(u.getUserId(), u)); + + JavaRDD filteredRatings = ratingRDD.filter(r -> r.getRating() == conf.q3Rating); + JavaPairRDD filteredRatingsPair = filteredRatings.mapToPair(r -> new Tuple2(r.getUserId(), r)); + + JavaPairRDD filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m)); + + JavaPairRDD> usersRatingsJoin = filteredUsersPair.join(filteredRatingsPair); + + // Map the previous RDD to + JavaPairRDD movieMatchRatings = usersRatingsJoin.mapToPair(t -> new Tuple2(t._2()._2().getMovieId(), t._2()._2())); + JavaPairRDD> moviesJoined = movieMatchRatings.join(filteredMoviesPair); + JavaRDD movieIdRDD = moviesJoined.map(t -> t._1()); + + Dataset resultDF = spark.createDataFrame(movieIdRDD, Integer.class); Dataset CompressedResult = resultDF.map((MapFunction) row -> "" + row.getInt(0), Encoders.STRING()); CompressedResult.show(); CompressedResult.write().text(CS448Utils.resolveUri(conf.outPath, "query-3")); @@ -153,10 +148,6 @@ public class Project4 { SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 4").getOrCreate(); // Write data processing code here - String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; - Dataset data; - - //// Reading, Parsing and counting lines for each of the data files JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() .javaRDD() .map(User::parseUser); @@ -176,8 +167,6 @@ public class Project4 { ratingDF.createOrReplaceTempView("Rating"); // Compute the result. - // "SELECT m.title, r.average FROM Movie m, (SELECT AVG(r.rating) as average FROM Rating r, User u WHERE r.userId = u.userId AND u.age = " + conf.q4Age + ") r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " + - // conf.q4Age); Dataset resultDF = spark.sql( "SELECT m.title, AVG(r.rating) FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " + conf.q4Age + " GROUP BY m.title"); Dataset CompressedResult = resultDF.map((MapFunction) row -> row.getString(0) + "::" + row.getDouble(1), Encoders.STRING());