Implement basic SQL for all queries

This commit is contained in:
Max O'Cull 2019-04-20 21:43:45 -04:00
parent 182acfd7fd
commit 921f7fddcc

View File

@ -53,29 +53,131 @@ public class Project4 {
Dataset<Row> ratingDF = spark.createDataFrame(ratingRDD, Rating.class);
ratingDF.createOrReplaceTempView("Rating");
// Compute the result.
Dataset<Row> resultDF = spark.sql("SELECT DISTINCT m.title FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " +
conf.q1Occupation + " AND r.rating >= " + conf.q1Rating);
//resultDF.show();
resultDF.foreach(x -> System.out.println(x.getString(0)));
resultDF.show();
resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-1"));
//Don't forget to stop spark session
spark.stop();
}
public void runSparkApp2(App.Conf conf){
System.out.println("Running Spark App for Query 2");
// Write your code here
// Create a Spark Session.
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 2").getOrCreate();
// Write data processing code here
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
Dataset<String> data;
//// Reading, Parsing and counting lines for each of the data files
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
.javaRDD()
.map(User::parseUser);
Dataset<Row> userDF = spark.createDataFrame(userRDD, User.class);
userDF.createOrReplaceTempView("User");
JavaRDD<Movie> movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
.javaRDD()
.map(Movie::parseMovie);
Dataset<Row> movieDF = spark.createDataFrame(movieRDD, Movie.class);
movieDF.createOrReplaceTempView("Movie");
JavaRDD<Rating> ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
.javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratingDF = spark.createDataFrame(ratingRDD, Rating.class);
ratingDF.createOrReplaceTempView("Rating");
// Compute the result.
Dataset<Row> resultDF = spark.sql("SELECT DISTINCT u.zip-code FROM User u WHERE u.occupation = " +
conf.q2Occupation1 + " OR u.occupation = " +
conf.q2Occupation2);
resultDF.show();
resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-2"));
//Don't forget to stop spark session
spark.stop();
}
public void runSparkApp3(App.Conf conf){
System.out.println("Running Spark App for Query 3");
// Write your code here
// Create a Spark Session.
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 3").getOrCreate();
// Write data processing code here
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
Dataset<String> data;
//// Reading, Parsing and counting lines for each of the data files
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
.javaRDD()
.map(User::parseUser);
Dataset<Row> userDF = spark.createDataFrame(userRDD, User.class);
userDF.createOrReplaceTempView("User");
JavaRDD<Movie> movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
.javaRDD()
.map(Movie::parseMovie);
Dataset<Row> movieDF = spark.createDataFrame(movieRDD, Movie.class);
movieDF.createOrReplaceTempView("Movie");
JavaRDD<Rating> ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
.javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratingDF = spark.createDataFrame(ratingRDD, Rating.class);
ratingDF.createOrReplaceTempView("Rating");
// Compute the result.
Dataset<Row> resultDF = spark.sql("SELECT DISTINCT m.movieId FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " +
conf.q3Occupation + " AND r.rating = " + conf.q3Rating);
resultDF.show();
resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-3"));
//Don't forget to stop spark session
spark.stop();
}
public void runSparkApp4(App.Conf conf){
System.out.println("Running Spark App for Query 4");
// Write your code here
// Create a Spark Session.
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 4").getOrCreate();
// Write data processing code here
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
Dataset<String> data;
//// Reading, Parsing and counting lines for each of the data files
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
.javaRDD()
.map(User::parseUser);
Dataset<Row> userDF = spark.createDataFrame(userRDD, User.class);
userDF.createOrReplaceTempView("User");
JavaRDD<Movie> movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
.javaRDD()
.map(Movie::parseMovie);
Dataset<Row> movieDF = spark.createDataFrame(movieRDD, Movie.class);
movieDF.createOrReplaceTempView("Movie");
JavaRDD<Rating> ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
.javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratingDF = spark.createDataFrame(ratingRDD, Rating.class);
ratingDF.createOrReplaceTempView("Rating");
// Compute the result.
Dataset<Row> resultDF = spark.sql("SELECT m.title, AVG(r.rating) FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " +
conf.q4Age);
resultDF.show();
resultDF.write().text(CS448Utils.resolveUri(conf.outPath, "query-4"));
//Don't forget to stop spark session
spark.stop();
}
}