Query 3 to RDD
This commit is contained in:
parent
367a7bd713
commit
a1bb490960
@ -1,26 +1,14 @@
|
|||||||
package cs448;
|
package cs448;
|
||||||
|
|
||||||
import org.apache.commons.cli.*;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.Optional;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.Function;
|
|
||||||
import org.apache.spark.api.java.function.Function2;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.Tuple2;
|
|
||||||
import scala.Tuple4;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import scala.Tuple2;
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class Project4 {
|
public class Project4 {
|
||||||
public void runSparkApp1(App.Conf conf){
|
public void runSparkApp1(App.Conf conf){
|
||||||
@ -33,10 +21,6 @@ public class Project4 {
|
|||||||
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 1").getOrCreate();
|
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 1").getOrCreate();
|
||||||
|
|
||||||
// Write data processing code here
|
// Write data processing code here
|
||||||
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
|
|
||||||
Dataset<String> data;
|
|
||||||
|
|
||||||
//// Reading, Parsing and counting lines for each of the data files
|
|
||||||
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(User::parseUser);
|
.map(User::parseUser);
|
||||||
@ -72,10 +56,6 @@ public class Project4 {
|
|||||||
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 2").getOrCreate();
|
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 2").getOrCreate();
|
||||||
|
|
||||||
// Write data processing code here
|
// Write data processing code here
|
||||||
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
|
|
||||||
Dataset<String> data;
|
|
||||||
|
|
||||||
//// Reading, Parsing and counting lines for each of the data files
|
|
||||||
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(User::parseUser);
|
.map(User::parseUser);
|
||||||
@ -112,32 +92,47 @@ public class Project4 {
|
|||||||
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 3").getOrCreate();
|
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 3").getOrCreate();
|
||||||
|
|
||||||
// Write data processing code here
|
// Write data processing code here
|
||||||
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
|
|
||||||
Dataset<String> data;
|
|
||||||
|
|
||||||
//// Reading, Parsing and counting lines for each of the data files
|
|
||||||
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(User::parseUser);
|
.map(User::parseUser);
|
||||||
Dataset<Row> userDF = spark.createDataFrame(userRDD, User.class);
|
// Dataset<Row> userDF = spark.createDataFrame(userRDD, User.class);
|
||||||
userDF.createOrReplaceTempView("User");
|
// userDF.createOrReplaceTempView("User");
|
||||||
|
|
||||||
JavaRDD<Movie> movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
|
JavaRDD<Movie> movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(Movie::parseMovie);
|
.map(Movie::parseMovie);
|
||||||
Dataset<Row> movieDF = spark.createDataFrame(movieRDD, Movie.class);
|
// Dataset<Row> movieDF = spark.createDataFrame(movieRDD, Movie.class);
|
||||||
movieDF.createOrReplaceTempView("Movie");
|
// movieDF.createOrReplaceTempView("Movie");
|
||||||
|
|
||||||
JavaRDD<Rating> ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
|
JavaRDD<Rating> ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(Rating::parseRating);
|
.map(Rating::parseRating);
|
||||||
Dataset<Row> ratingDF = spark.createDataFrame(ratingRDD, Rating.class);
|
// Dataset<Row> ratingDF = spark.createDataFrame(ratingRDD, Rating.class);
|
||||||
ratingDF.createOrReplaceTempView("Rating");
|
// ratingDF.createOrReplaceTempView("Rating");
|
||||||
|
|
||||||
// Compute the result.
|
// Compute the result.
|
||||||
Dataset<Row> resultDF = spark.sql("SELECT DISTINCT m.movieId FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " +
|
// Dataset<Row> resultDF = spark.sql("SELECT DISTINCT m.movieId FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.occupation = " +
|
||||||
conf.q3Occupation + " AND r.rating = " + conf.q3Rating);
|
// conf.q3Occupation + " AND r.rating = " + conf.q3Rating);
|
||||||
resultDF.show();
|
// resultDF.show();
|
||||||
|
// Dataset<String> CompressedResult = resultDF.map((MapFunction<Row, String>) row -> "" + row.getInt(0), Encoders.STRING());
|
||||||
|
// CompressedResult.show();
|
||||||
|
// CompressedResult.write().text(CS448Utils.resolveUri(conf.outPath, "query-3"));
|
||||||
|
JavaRDD<User> filteredUsers = userRDD.filter(u -> u.getOccupation().equals(conf.q3Occupation));
|
||||||
|
JavaPairRDD<Integer, User> filteredUsersPair = filteredUsers.mapToPair(u -> new Tuple2(u.getUserId(), u));
|
||||||
|
|
||||||
|
JavaRDD<Rating> filteredRatings = ratingRDD.filter(r -> r.getRating() == conf.q3Rating);
|
||||||
|
JavaPairRDD<Integer, Rating> filteredRatingsPair = filteredRatings.mapToPair(r -> new Tuple2(r.getUserId(), r));
|
||||||
|
|
||||||
|
JavaPairRDD<Integer, Movie> filteredMoviesPair = movieRDD.mapToPair(m -> new Tuple2(m.getMovieId(), m));
|
||||||
|
|
||||||
|
JavaPairRDD<Integer, Tuple2<User, Rating>> usersRatingsJoin = filteredUsersPair.join(filteredRatingsPair);
|
||||||
|
|
||||||
|
// Map the previous RDD to <MovieID, Rating>
|
||||||
|
JavaPairRDD<Integer, Rating> movieMatchRatings = usersRatingsJoin.mapToPair(t -> new Tuple2(t._2()._2().getMovieId(), t._2()._2()));
|
||||||
|
JavaPairRDD<Integer, Tuple2<Rating, Movie>> moviesJoined = movieMatchRatings.join(filteredMoviesPair);
|
||||||
|
JavaRDD<Integer> movieIdRDD = moviesJoined.map(t -> t._1());
|
||||||
|
|
||||||
|
Dataset<Row> resultDF = spark.createDataFrame(movieIdRDD, Integer.class);
|
||||||
Dataset<String> CompressedResult = resultDF.map((MapFunction<Row, String>) row -> "" + row.getInt(0), Encoders.STRING());
|
Dataset<String> CompressedResult = resultDF.map((MapFunction<Row, String>) row -> "" + row.getInt(0), Encoders.STRING());
|
||||||
CompressedResult.show();
|
CompressedResult.show();
|
||||||
CompressedResult.write().text(CS448Utils.resolveUri(conf.outPath, "query-3"));
|
CompressedResult.write().text(CS448Utils.resolveUri(conf.outPath, "query-3"));
|
||||||
@ -153,10 +148,6 @@ public class Project4 {
|
|||||||
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 4").getOrCreate();
|
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Query 4").getOrCreate();
|
||||||
|
|
||||||
// Write data processing code here
|
// Write data processing code here
|
||||||
String dataFiles[] = {conf.usersFName, conf.moviesFName, conf.ratingsFName};
|
|
||||||
Dataset<String> data;
|
|
||||||
|
|
||||||
//// Reading, Parsing and counting lines for each of the data files
|
|
||||||
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(User::parseUser);
|
.map(User::parseUser);
|
||||||
@ -176,8 +167,6 @@ public class Project4 {
|
|||||||
ratingDF.createOrReplaceTempView("Rating");
|
ratingDF.createOrReplaceTempView("Rating");
|
||||||
|
|
||||||
// Compute the result.
|
// Compute the result.
|
||||||
// "SELECT m.title, r.average FROM Movie m, (SELECT AVG(r.rating) as average FROM Rating r, User u WHERE r.userId = u.userId AND u.age = " + conf.q4Age + ") r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " +
|
|
||||||
// conf.q4Age);
|
|
||||||
Dataset<Row> resultDF = spark.sql(
|
Dataset<Row> resultDF = spark.sql(
|
||||||
"SELECT m.title, AVG(r.rating) FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " + conf.q4Age + " GROUP BY m.title");
|
"SELECT m.title, AVG(r.rating) FROM Movie m, Rating r, User u WHERE m.movieId = r.movieId AND r.userId = u.userId AND u.age = " + conf.q4Age + " GROUP BY m.title");
|
||||||
Dataset<String> CompressedResult = resultDF.map((MapFunction<Row, String>) row -> row.getString(0) + "::" + row.getDouble(1), Encoders.STRING());
|
Dataset<String> CompressedResult = resultDF.map((MapFunction<Row, String>) row -> row.getString(0) + "::" + row.getDouble(1), Encoders.STRING());
|
||||||
|
Loading…
Reference in New Issue
Block a user