diff --git a/p4/src/main/java/cs448/Project4.java b/p4/src/main/java/cs448/Project4.java index 49c2bc3..87dc128 100644 --- a/p4/src/main/java/cs448/Project4.java +++ b/p4/src/main/java/cs448/Project4.java @@ -53,29 +53,131 @@ public class Project4 { Dataset ratingDF = spark.createDataFrame(ratingRDD, Rating.class); ratingDF.createOrReplaceTempView("Rating"); + // Compute the result. Dataset resultDF = spark.sql("SELECT DISTINCT m.title FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " + conf.q1Occupation + " AND r.rating >= " + conf.q1Rating); - //resultDF.show(); - resultDF.foreach(x -> System.out.println(x.getString(0))); + resultDF.show(); resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-1")); - //Don't forget to stop spark session spark.stop(); } public void runSparkApp2(App.Conf conf){ System.out.println("Running Spark App for Query 2"); - // Write your code here + + // Create a Spark Session. + SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 2").getOrCreate(); + + // Write data processing code here + String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; + Dataset data; + + //// Reading, Parsing and counting lines for each of the data files + JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() + .javaRDD() + .map(User::parseUser); + Dataset userDF = spark.createDataFrame(userRDD, User.class); + userDF.createOrReplaceTempView("User"); + + JavaRDD movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache() + .javaRDD() + .map(Movie::parseMovie); + Dataset movieDF = spark.createDataFrame(movieRDD, Movie.class); + movieDF.createOrReplaceTempView("Movie"); + + JavaRDD ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache() + .javaRDD() + .map(Rating::parseRating); + Dataset ratingDF = spark.createDataFrame(ratingRDD, Rating.class); + ratingDF.createOrReplaceTempView("Rating"); + + // Compute the result. + Dataset resultDF = spark.sql("SELECT DISTINCT u.zip-code FROM User u WHERE u.occupation = " + + conf.q2Occupation1 + " OR u.occupation = " + + conf.q2Occupation2); + resultDF.show(); + resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-2")); + + //Don't forget to stop spark session + spark.stop(); } public void runSparkApp3(App.Conf conf){ System.out.println("Running Spark App for Query 3"); - // Write your code here + + // Create a Spark Session. + SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 3").getOrCreate(); + + // Write data processing code here + String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; + Dataset data; + + //// Reading, Parsing and counting lines for each of the data files + JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() + .javaRDD() + .map(User::parseUser); + Dataset userDF = spark.createDataFrame(userRDD, User.class); + userDF.createOrReplaceTempView("User"); + + JavaRDD movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache() + .javaRDD() + .map(Movie::parseMovie); + Dataset movieDF = spark.createDataFrame(movieRDD, Movie.class); + movieDF.createOrReplaceTempView("Movie"); + + JavaRDD ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache() + .javaRDD() + .map(Rating::parseRating); + Dataset ratingDF = spark.createDataFrame(ratingRDD, Rating.class); + ratingDF.createOrReplaceTempView("Rating"); + + // Compute the result. + Dataset resultDF = spark.sql("SELECT DISTINCT m.movieId FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " + + conf.q3Occupation + " AND r.rating = " + conf.q3Rating); + resultDF.show(); + resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-3")); + + //Don't forget to stop spark session + spark.stop(); } public void runSparkApp4(App.Conf conf){ System.out.println("Running Spark App for Query 4"); - // Write your code here + + // Create a Spark Session. + SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 4").getOrCreate(); + + // Write data processing code here + String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName}; + Dataset data; + + //// Reading, Parsing and counting lines for each of the data files + JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache() + .javaRDD() + .map(User::parseUser); + Dataset userDF = spark.createDataFrame(userRDD, User.class); + userDF.createOrReplaceTempView("User"); + + JavaRDD movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache() + .javaRDD() + .map(Movie::parseMovie); + Dataset movieDF = spark.createDataFrame(movieRDD, Movie.class); + movieDF.createOrReplaceTempView("Movie"); + + JavaRDD ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache() + .javaRDD() + .map(Rating::parseRating); + Dataset ratingDF = spark.createDataFrame(ratingRDD, Rating.class); + ratingDF.createOrReplaceTempView("Rating"); + + // Compute the result. + Dataset resultDF = spark.sql("SELECT m.title, AVG(r.rating) FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " + + conf.q4Age); + resultDF.show(); + resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-4")); + + //Don't forget to stop spark session + spark.stop(); } }