diff --git a/p4/pom.xml b/p4/pom.xml
new file mode 100644
index 0000000..23322c7
--- /dev/null
+++ b/p4/pom.xml
@@ -0,0 +1,55 @@
+
+
+ 4.0.0
+
+ cs448
+ p4
+ 1.0-SNAPSHOT
+
+
+
+ org.apache.spark
+ spark-sql_2.11
+ 2.2.0
+ compile
+
+
+ commons-cli
+ commons-cli
+ 1.2
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.0
+
+
+ 1.8
+
+
+
+ maven-jar-plugin
+ 3.0.2
+
+
+ maven-assembly-plugin
+
+
+
+ cs448.App
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/p4/src/.DS_Store b/p4/src/.DS_Store
new file mode 100644
index 0000000..0957181
Binary files /dev/null and b/p4/src/.DS_Store differ
diff --git a/p4/src/main/java/cs448/App.java b/p4/src/main/java/cs448/App.java
new file mode 100644
index 0000000..5210c4d
--- /dev/null
+++ b/p4/src/main/java/cs448/App.java
@@ -0,0 +1,364 @@
+package cs448;
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+import scala.Tuple4;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+public class App {
+
+ private static Options options;
+ private static CommandLineParser parser;
+ private static SparkSession ssTest;
+
+ public static void main(String[] args) {
+
+ initOptions();
+
+ try {
+ // parse the command line arguments
+ CommandLine line = parser.parse( options, args );
+ if (!line.hasOption("i")){
+ printHelp();
+ appExit();
+ }
+
+ String inPath = line.getOptionValue("i");
+ String outPath = line.getOptionValue("o");
+
+ String userFName = line.getOptionValue("u","users.dat");
+ String moviesFName = line.getOptionValue("m","movies.dat");
+ String ratingsFName = line.getOptionValue("r", "ratings.dat");
+
+ Conf aa = new Conf(inPath,outPath,userFName,moviesFName,ratingsFName);
+
+
+ if (line.hasOption("warmup")){
+ warmupExercise(aa);
+ }
+ else{
+ // Simple query parameter check
+ if (!line.hasOption("q")){
+ System.out.println("Query parameters is not supplied. Please supply query parameters");
+ printHelp();
+ appExit();
+ }
+ String qps = line.getOptionValue("q");
+ String [] params = qps.split(",");
+ if (params.length < 2){
+ System.out.println("Query parameters is not formatted proparly");
+ printHelp();
+ appExit();
+ }
+
+ aa.appNum = Integer.parseInt(params[0]);
+ aa.queryParams = params;
+
+ if (line.hasOption("test")){
+ aa.testMode = true;
+ System.out.println("Testing mode. Ignoring query parameters and using built-in values.");
+ }
+
+ run(aa);
+ }
+ }
+ catch( ParseException exp ) {
+ // oops, something went wrong
+ System.err.println( "Parsing failed. Reason: " + exp.getMessage() );
+ }
+ }
+
+ private static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "p4", options );
+ }
+
+ private static void appExit() {
+ System.exit(0);
+ }
+
+ private static void initOptions(){
+ options = new Options();
+ options.addOption("i", "input-directory", true, "(required) path to input directory in HDFS");
+ options.addOption("o", "output-directory", true, "path to output directory on HDFS");
+ options.addOption("u", "user-data-filename", true, "User data filename");
+ options.addOption("m", "movies-data-filename", true, "Movies data filename");
+ options.addOption("r", "rating-data-filename", true, "Ratings data filename");
+ options.addOption("q", "query-params", true, "Selects what query to execute and its parameters. Comma-separated format: QueryNumber,param1,param2,...");
+ options.addOption("warmup", false,"Run warm-up exercise");
+ options.addOption("test", false,"Run test - using fixed parameters");
+ parser = new BasicParser();
+ Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
+
+ }
+
+ private static void warmupExercise(Conf conf){
+ System.out.println("*** WARM-UP EXERCISE ***");
+
+ // Creating a Spark session
+ SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Warmup Exercise").getOrCreate();
+
+ String dataFiles[] = {conf.usersFName,conf.moviesFName,conf.ratingsFName};
+ Dataset data;
+
+ // Reading, Parsing and counting lines for each of the data files
+ JavaRDD userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
+ .javaRDD()
+ .map(User::parseUser);
+ long lineCount = userRDD.count();
+ System.out.println("Total lines in data file ( "+ conf.usersFName +" ) : " + lineCount);
+
+ JavaRDD movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
+ .javaRDD()
+ .map(Movie::parseMovie);
+
+ lineCount = movieRDD.count();
+ System.out.println("Total lines in data file ( "+ conf.moviesFName +" ) : " + lineCount);
+
+ JavaRDD ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
+ .javaRDD()
+ .map(Rating::parseRating);
+
+ lineCount = ratingRDD.count();
+ System.out.println("Total lines in data file ( "+ conf.ratingsFName +" ) : " + lineCount);
+
+ // Terminating the Spark session
+ spark.stop();
+ }
+
+ private static void run(Conf conf){
+ Project4 p4 = new Project4();
+ if (conf.testMode){
+ conf.outPath += "/test";
+ ssTest = SparkSession.builder().appName("CS 448 Project 4 -- Test session").getOrCreate();
+ String testPath = CS448Utils.resolveUri(conf.outPath);
+ try {
+ FileSystem hdfs = FileSystem.get(ssTest.sparkContext().hadoopConfiguration());
+ if (hdfs.exists(new Path(testPath)))
+ hdfs.delete(new Path(testPath), true);
+ }
+ catch (IOException e){
+ e.printStackTrace();
+ }
+ }
+
+ switch (conf.appNum){
+ case 2:
+ try {
+ if (conf.testMode){
+ conf.q2Occupation1 = CS448Utils.TEST_q2Occupation1;
+ conf.q2Occupation2 = CS448Utils.TEST_q2Occupation2;
+ }
+ else{
+ conf.q2Occupation1 = Integer.parseInt(conf.queryParams[1]);
+ conf.q2Occupation2 = Integer.parseInt(conf.queryParams[2]);
+ }
+ }
+ catch (Exception e){
+ System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
+ printHelp();
+ appExit();
+ }
+ p4.runSparkApp2(conf);
+ break;
+ case 3:
+ try {
+ if (conf.testMode){
+ conf.q3Rating = CS448Utils.TEST_q3Rating;
+ conf.q3Occupation = CS448Utils.TEST_q3Occupation;
+ }
+ else{
+ conf.q3Rating = Integer.parseInt(conf.queryParams[1]);
+ conf.q3Occupation = Integer.parseInt(conf.queryParams[2]);
+ }
+ }
+ catch (Exception e){
+ System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
+ printHelp();
+ appExit();
+ }
+ p4.runSparkApp3(conf);
+ break;
+ case 4:
+ try {
+ if (conf.testMode){
+ conf.q4Age = CS448Utils.TEST_q4Age;
+ }
+ else{
+ conf.q4Age = Integer.parseInt(conf.queryParams[1]);
+ }
+ }
+ catch (Exception e){
+ System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
+ printHelp();
+ appExit();
+ }
+ p4.runSparkApp4(conf);
+ break;
+ default:
+ assert(conf.appNum == 1);
+ try {
+ if (conf.testMode){
+ conf.q1Occupation = CS448Utils.TEST_q1Occupation;
+ conf.q1Rating = CS448Utils.TEST_q1Rating;
+ }
+ else{
+ conf.q1Occupation = Integer.parseInt(conf.queryParams[1]);
+ conf.q1Rating = Integer.parseInt(conf.queryParams[2]);
+ }
+ }
+ catch (Exception e){
+ System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
+ printHelp();
+ appExit();
+ }
+ p4.runSparkApp1(conf);
+ break;
+ }
+
+ if (conf.testMode){
+ testQuery(conf);
+ }
+ }
+
+ private static void testQuery(Conf conf) {
+ // test query output
+ boolean res = false;
+ int queryNum = conf.appNum;
+ System.out.println(String.format("Running test for Query %d",queryNum));
+ ssTest = SparkSession.builder().appName(String.format("CS 448 Project 4 -- Test session for Q%d",queryNum)).getOrCreate();
+ // Compare results
+ if (queryNum != 4){
+ JavaPairRDD qResultRDDTest = ssTest.read().textFile(CS448Utils.getTestUri(queryNum)).javaRDD().mapToPair(l -> new Tuple2(l,1L));
+ JavaPairRDD qResultRDD = ssTest.read().textFile(CS448Utils.resolveUri(conf.outPath + String.format("/query-%d",queryNum))).javaRDD().mapToPair(l -> new Tuple2(l,1L));
+ JavaPairRDD,Optional>> joined = qResultRDD.fullOuterJoin(qResultRDDTest).cache();
+ long test_cnt = qResultRDDTest.count();
+ long ans_cnt = qResultRDD.count();
+ long join_cnt = joined.count();
+ res = (test_cnt == ans_cnt) && test_cnt == join_cnt;
+ }
+ else{
+ JavaPairRDD q4ResultRDDTest = ssTest.read().textFile(CS448Utils.getTestUri(queryNum)).javaRDD().mapToPair(l -> {
+ String[] ol = l.split("::");
+ return new Tuple2(ol[0],Double.parseDouble(ol[1]));
+ });
+ JavaPairRDD q4ResultRDD = ssTest.read().textFile(CS448Utils.resolveUri(conf.outPath + String.format("/query-%d",queryNum))).javaRDD().mapToPair(l -> {
+ String[] ol = l.split("::");
+ return new Tuple2(ol[0],Double.parseDouble(ol[1]));
+ });
+ JavaPairRDD,Optional>> joined = q4ResultRDD.fullOuterJoin(q4ResultRDDTest).cache();
+ long test_cnt = q4ResultRDDTest.count();
+ long ans_cnt = q4ResultRDD.count();
+ long join_cnt = joined.count();
+ res = (test_cnt == ans_cnt) && test_cnt == join_cnt;
+
+ // check values
+ if (res){
+ JavaRDD avgOk = joined.map( p ->{
+ Optional t = p._2()._1();
+ Optional a = p._2()._2();
+
+ if (t.isPresent() && a.isPresent()){
+ if (Math.abs(t.get() - a.get()) <= 0.1F){
+ return Boolean.TRUE;
+ }
+ else{
+ return Boolean.FALSE;
+ }
+ }
+ else{
+ return Boolean.FALSE;
+ }
+
+ });
+ res = avgOk.fold(true, (a,b) -> a && b);
+ }
+ }
+
+ if (res){
+ System.out.println(String.format("Test for Query %d PASSED",queryNum));
+ }
+ else{
+ System.out.println(String.format("Test for Query %d FAILED!!!",queryNum));
+ }
+ ssTest.stop();
+ }
+
+ public static class Conf implements Serializable {
+
+ String inPath,outPath,usersFName, moviesFName, ratingsFName;
+ int appNum;
+ String [] queryParams;
+ int q1Occupation,q1Rating;
+ int q2Occupation1,q2Occupation2;
+ int q3Occupation,q3Rating;
+ int q4Age;
+ boolean testMode;
+
+ public Conf(String inPath, String outPath, String usersFName, String moviesFName, String ratingsFName) {
+ this.inPath = inPath;
+ this.outPath = outPath;
+ this.usersFName = usersFName;
+ this.moviesFName = moviesFName;
+ this.ratingsFName = ratingsFName;
+ testMode = false;
+ }
+
+ public String getInPath() {
+ return inPath;
+ }
+
+ public void setInPath(String inPath) {
+ this.inPath = inPath;
+ }
+
+ public String getOutPath() {
+ return outPath;
+ }
+
+ public void setOutPath(String outPath) {
+ this.outPath = outPath;
+ }
+
+ public String getUserFName() {
+ return usersFName;
+ }
+
+ public void setUserFName(String userFName) {
+ this.usersFName = userFName;
+ }
+
+ public String getMoviesFName() {
+ return moviesFName;
+ }
+
+ public void setMoviesFName(String moviesFName) {
+ this.moviesFName = moviesFName;
+ }
+
+ public String getRatingsFName() {
+ return ratingsFName;
+ }
+
+ public void setRatingsFName(String ratingsFName) {
+ this.ratingsFName = ratingsFName;
+ }
+
+
+ }
+}
\ No newline at end of file
diff --git a/p4/src/main/java/cs448/CS448Utils.java b/p4/src/main/java/cs448/CS448Utils.java
new file mode 100644
index 0000000..14aa1af
--- /dev/null
+++ b/p4/src/main/java/cs448/CS448Utils.java
@@ -0,0 +1,42 @@
+package cs448;
+
+public class CS448Utils {
+
+ public static String HDFS_URI = "hdfs://scholar-h000.rcac.purdue.edu:8020";
+
+ //Testing values for Q1
+ public static int TEST_q1Rating = 3;
+ public static int TEST_q1Occupation = 12;
+
+ //Testing values for Q2
+ public static int TEST_q2Occupation1 = 12;
+ public static int TEST_q2Occupation2 = 11;
+
+ //Testing values for Q3
+ public static int TEST_q3Occupation = 12;
+ public static int TEST_q3Rating = 3;
+
+ //Testing values for Q4
+ public static int TEST_q4Age = 18;
+
+ public static String resolveUri(String path){
+ StringBuilder sb = new StringBuilder();
+ sb.append(HDFS_URI);
+ sb.append('/');
+ sb.append(path);
+ return sb.toString();
+ }
+
+ public static String resolveUri(String dirPath, String filename){
+ StringBuilder sb = new StringBuilder();
+ sb.append(dirPath);
+ sb.append('/');
+ sb.append(filename);
+ return resolveUri(sb.toString());
+ }
+
+ public static String getTestUri(int i){
+ return resolveUri(String.format("/user/tqadah/test/query-%d",i));
+ }
+
+}
diff --git a/p4/src/main/java/cs448/Movie.java b/p4/src/main/java/cs448/Movie.java
new file mode 100644
index 0000000..b3e7434
--- /dev/null
+++ b/p4/src/main/java/cs448/Movie.java
@@ -0,0 +1,51 @@
+package cs448;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class Movie implements Serializable {
+ private Integer movieId;
+ private String title;
+ private String[] genres;
+
+ public Movie(){}
+
+ public Movie(Integer movieId, String title, String[] genre) {
+ this.movieId = movieId;
+ this.title = title;
+ this.genres = genre;
+ }
+
+ public Integer getMovieId() {
+ return movieId;
+ }
+
+ public void setMovieId(Integer movieId) {
+ this.movieId = movieId;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String[] getGenres() {
+ return genres;
+ }
+
+ public void setGenres(String[] genres) {
+ this.genres = genres;
+ }
+
+ public static Movie parseMovie(String line){
+ String[] cols = line.split("::");
+ Movie m = new Movie();
+ m.setMovieId(Integer.parseInt(cols[0]));
+ m.setTitle(cols[1]);
+ m.setGenres( cols[2].split("\\|"));
+ return m;
+ }
+}
diff --git a/p4/src/main/java/cs448/Project4.java b/p4/src/main/java/cs448/Project4.java
new file mode 100644
index 0000000..3b46cc1
--- /dev/null
+++ b/p4/src/main/java/cs448/Project4.java
@@ -0,0 +1,31 @@
+package cs448;
+
+public class Project4 {
+ public void runSparkApp1(App.Conf conf){
+ System.out.println("Running Your First Spark App!");
+ /* Hint: @see App#warmupExercise() for a sample Spark application code
+ * using SparkRDD API
+ */
+
+ // Create a Spark Session.
+
+ // Write data processing code here
+
+ //Don't forget to stop spark session
+ }
+
+ public void runSparkApp2(App.Conf conf){
+ System.out.println("Running Spark App for Query 2");
+ // Write your code here
+ }
+
+ public void runSparkApp3(App.Conf conf){
+ System.out.println("Running Spark App for Query 3");
+ // Write your code here
+ }
+
+ public void runSparkApp4(App.Conf conf){
+ System.out.println("Running Spark App for Query 4");
+ // Write your code here
+ }
+}
diff --git a/p4/src/main/java/cs448/Rating.java b/p4/src/main/java/cs448/Rating.java
new file mode 100644
index 0000000..fb77b29
--- /dev/null
+++ b/p4/src/main/java/cs448/Rating.java
@@ -0,0 +1,61 @@
+package cs448;
+
+import java.io.Serializable;
+
+public class Rating implements Serializable {
+ private Integer userId;
+ private Integer movieId;
+ private Integer rating;
+ private Long time;
+
+ public Rating(){}
+
+ public Rating(Integer userId, Integer movieId, Integer rating, Long time) {
+ this.userId = userId;
+ this.movieId = movieId;
+ this.rating = rating;
+ this.time = time;
+ }
+
+ public Integer getUserId() {
+ return userId;
+ }
+
+ public void setUserId(Integer userId) {
+ this.userId = userId;
+ }
+
+ public Integer getMovieId() {
+ return movieId;
+ }
+
+ public void setMovieId(Integer movieId) {
+ this.movieId = movieId;
+ }
+
+ public Integer getRating() {
+ return rating;
+ }
+
+ public void setRating(Integer rating) {
+ this.rating = rating;
+ }
+
+ public Long getTime() {
+ return time;
+ }
+
+ public void setTime(Long time) {
+ this.time = time;
+ }
+
+ public static Rating parseRating(String line){
+ String[] cols = line.split("::");
+ Rating r = new Rating();
+ r.setUserId(Integer.parseInt(cols[0]));
+ r.setMovieId(Integer.parseInt(cols[1]));
+ r.setRating(Integer.parseInt(cols[2]));
+ r.setTime(Long.parseLong(cols[3]));
+ return r;
+ }
+}
diff --git a/p4/src/main/java/cs448/User.java b/p4/src/main/java/cs448/User.java
new file mode 100644
index 0000000..753e741
--- /dev/null
+++ b/p4/src/main/java/cs448/User.java
@@ -0,0 +1,73 @@
+package cs448;
+
+import java.io.Serializable;
+
+public class User implements Serializable {
+ private Integer userId;
+ private String gender;
+ private Integer age;
+ private Integer occupation;
+ private String zipcode;
+
+ public User(){}
+
+ public User(Integer userId, String gender, Integer age, Integer occupation, String zipcode) {
+ this.userId = userId;
+ this.gender = gender;
+ this.age = age;
+ this.occupation = occupation;
+ this.zipcode = zipcode;
+ }
+
+ public Integer getUserId() {
+ return userId;
+ }
+
+ public void setUserId(Integer userId) {
+ this.userId = userId;
+ }
+
+ public String getGender() {
+ return gender;
+ }
+
+ public void setGender(String gender) {
+ this.gender = gender;
+ }
+
+ public Integer getAge() {
+ return age;
+ }
+
+ public void setAge(Integer age) {
+ this.age = age;
+ }
+
+ public Integer getOccupation() {
+ return occupation;
+ }
+
+ public void setOccupation(Integer occupation) {
+ this.occupation = occupation;
+ }
+
+
+ public String getZipcode() {
+ return zipcode;
+ }
+
+ public void setZipcode(String zipcode) {
+ this.zipcode = zipcode;
+ }
+
+ public static User parseUser(String line){
+ String[] cols = line.split("::");
+ User u = new User();
+ u.setUserId(Integer.parseInt(cols[0]));
+ u.setGender(cols[1]);
+ u.setAge(Integer.parseInt(cols[2]));
+ u.setOccupation(Integer.parseInt(cols[3]));
+ u.setZipcode(cols[4]);
+ return u;
+ }
+}