Initial commit of P4

This commit is contained in:
Max O'Cull 2019-04-20 13:44:14 -04:00
parent b3c3961130
commit 3d940d00e6
8 changed files with 677 additions and 0 deletions

55
p4/pom.xml Normal file
View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cs448</groupId>
<artifactId>p4</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>cs448.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

BIN
p4/src/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,364 @@
package cs448;
import org.apache.commons.cli.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.Tuple4;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
public class App {
private static Options options;
private static CommandLineParser parser;
private static SparkSession ssTest;
public static void main(String[] args) {
initOptions();
try {
// parse the command line arguments
CommandLine line = parser.parse( options, args );
if (!line.hasOption("i")){
printHelp();
appExit();
}
String inPath = line.getOptionValue("i");
String outPath = line.getOptionValue("o");
String userFName = line.getOptionValue("u","users.dat");
String moviesFName = line.getOptionValue("m","movies.dat");
String ratingsFName = line.getOptionValue("r", "ratings.dat");
Conf aa = new Conf(inPath,outPath,userFName,moviesFName,ratingsFName);
if (line.hasOption("warmup")){
warmupExercise(aa);
}
else{
// Simple query parameter check
if (!line.hasOption("q")){
System.out.println("Query parameters is not supplied. Please supply query parameters");
printHelp();
appExit();
}
String qps = line.getOptionValue("q");
String [] params = qps.split(",");
if (params.length < 2){
System.out.println("Query parameters is not formatted proparly");
printHelp();
appExit();
}
aa.appNum = Integer.parseInt(params[0]);
aa.queryParams = params;
if (line.hasOption("test")){
aa.testMode = true;
System.out.println("Testing mode. Ignoring query parameters and using built-in values.");
}
run(aa);
}
}
catch( ParseException exp ) {
// oops, something went wrong
System.err.println( "Parsing failed. Reason: " + exp.getMessage() );
}
}
private static void printHelp() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "p4", options );
}
private static void appExit() {
System.exit(0);
}
private static void initOptions(){
options = new Options();
options.addOption("i", "input-directory", true, "(required) path to input directory in HDFS");
options.addOption("o", "output-directory", true, "path to output directory on HDFS");
options.addOption("u", "user-data-filename", true, "User data filename");
options.addOption("m", "movies-data-filename", true, "Movies data filename");
options.addOption("r", "rating-data-filename", true, "Ratings data filename");
options.addOption("q", "query-params", true, "Selects what query to execute and its parameters. Comma-separated format: QueryNumber,param1,param2,...");
options.addOption("warmup", false,"Run warm-up exercise");
options.addOption("test", false,"Run test - using fixed parameters");
parser = new BasicParser();
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
}
private static void warmupExercise(Conf conf){
System.out.println("*** WARM-UP EXERCISE ***");
// Creating a Spark session
SparkSession spark = SparkSession.builder().appName("CS 448 Project 4 -- Warmup Exercise").getOrCreate();
String dataFiles[] = {conf.usersFName,conf.moviesFName,conf.ratingsFName};
Dataset<String> data;
// Reading, Parsing and counting lines for each of the data files
JavaRDD<User> userRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.usersFName)).cache()
.javaRDD()
.map(User::parseUser);
long lineCount = userRDD.count();
System.out.println("Total lines in data file ( "+ conf.usersFName +" ) : " + lineCount);
JavaRDD<Movie> movieRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.moviesFName)).cache()
.javaRDD()
.map(Movie::parseMovie);
lineCount = movieRDD.count();
System.out.println("Total lines in data file ( "+ conf.moviesFName +" ) : " + lineCount);
JavaRDD<Rating> ratingRDD = spark.read().textFile(CS448Utils.resolveUri(conf.inPath,conf.ratingsFName)).cache()
.javaRDD()
.map(Rating::parseRating);
lineCount = ratingRDD.count();
System.out.println("Total lines in data file ( "+ conf.ratingsFName +" ) : " + lineCount);
// Terminating the Spark session
spark.stop();
}
private static void run(Conf conf){
Project4 p4 = new Project4();
if (conf.testMode){
conf.outPath += "/test";
ssTest = SparkSession.builder().appName("CS 448 Project 4 -- Test session").getOrCreate();
String testPath = CS448Utils.resolveUri(conf.outPath);
try {
FileSystem hdfs = FileSystem.get(ssTest.sparkContext().hadoopConfiguration());
if (hdfs.exists(new Path(testPath)))
hdfs.delete(new Path(testPath), true);
}
catch (IOException e){
e.printStackTrace();
}
}
switch (conf.appNum){
case 2:
try {
if (conf.testMode){
conf.q2Occupation1 = CS448Utils.TEST_q2Occupation1;
conf.q2Occupation2 = CS448Utils.TEST_q2Occupation2;
}
else{
conf.q2Occupation1 = Integer.parseInt(conf.queryParams[1]);
conf.q2Occupation2 = Integer.parseInt(conf.queryParams[2]);
}
}
catch (Exception e){
System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
printHelp();
appExit();
}
p4.runSparkApp2(conf);
break;
case 3:
try {
if (conf.testMode){
conf.q3Rating = CS448Utils.TEST_q3Rating;
conf.q3Occupation = CS448Utils.TEST_q3Occupation;
}
else{
conf.q3Rating = Integer.parseInt(conf.queryParams[1]);
conf.q3Occupation = Integer.parseInt(conf.queryParams[2]);
}
}
catch (Exception e){
System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
printHelp();
appExit();
}
p4.runSparkApp3(conf);
break;
case 4:
try {
if (conf.testMode){
conf.q4Age = CS448Utils.TEST_q4Age;
}
else{
conf.q4Age = Integer.parseInt(conf.queryParams[1]);
}
}
catch (Exception e){
System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
printHelp();
appExit();
}
p4.runSparkApp4(conf);
break;
default:
assert(conf.appNum == 1);
try {
if (conf.testMode){
conf.q1Occupation = CS448Utils.TEST_q1Occupation;
conf.q1Rating = CS448Utils.TEST_q1Rating;
}
else{
conf.q1Occupation = Integer.parseInt(conf.queryParams[1]);
conf.q1Rating = Integer.parseInt(conf.queryParams[2]);
}
}
catch (Exception e){
System.out.println(String.format("Error parsing Query %d Query parameters format error.",conf.appNum));
printHelp();
appExit();
}
p4.runSparkApp1(conf);
break;
}
if (conf.testMode){
testQuery(conf);
}
}
private static void testQuery(Conf conf) {
// test query output
boolean res = false;
int queryNum = conf.appNum;
System.out.println(String.format("Running test for Query %d",queryNum));
ssTest = SparkSession.builder().appName(String.format("CS 448 Project 4 -- Test session for Q%d",queryNum)).getOrCreate();
// Compare results
if (queryNum != 4){
JavaPairRDD<String,Long> qResultRDDTest = ssTest.read().textFile(CS448Utils.getTestUri(queryNum)).javaRDD().mapToPair(l -> new Tuple2<String,Long>(l,1L));
JavaPairRDD<String,Long> qResultRDD = ssTest.read().textFile(CS448Utils.resolveUri(conf.outPath + String.format("/query-%d",queryNum))).javaRDD().mapToPair(l -> new Tuple2<String,Long>(l,1L));
JavaPairRDD<String, Tuple2<Optional<Long>,Optional<Long>>> joined = qResultRDD.fullOuterJoin(qResultRDDTest).cache();
long test_cnt = qResultRDDTest.count();
long ans_cnt = qResultRDD.count();
long join_cnt = joined.count();
res = (test_cnt == ans_cnt) && test_cnt == join_cnt;
}
else{
JavaPairRDD<String,Double> q4ResultRDDTest = ssTest.read().textFile(CS448Utils.getTestUri(queryNum)).javaRDD().mapToPair(l -> {
String[] ol = l.split("::");
return new Tuple2<String,Double>(ol[0],Double.parseDouble(ol[1]));
});
JavaPairRDD<String,Double> q4ResultRDD = ssTest.read().textFile(CS448Utils.resolveUri(conf.outPath + String.format("/query-%d",queryNum))).javaRDD().mapToPair(l -> {
String[] ol = l.split("::");
return new Tuple2<String,Double>(ol[0],Double.parseDouble(ol[1]));
});
JavaPairRDD<String, Tuple2<Optional<Double>,Optional<Double>>> joined = q4ResultRDD.fullOuterJoin(q4ResultRDDTest).cache();
long test_cnt = q4ResultRDDTest.count();
long ans_cnt = q4ResultRDD.count();
long join_cnt = joined.count();
res = (test_cnt == ans_cnt) && test_cnt == join_cnt;
// check values
if (res){
JavaRDD<Boolean> avgOk = joined.map( p ->{
Optional<Double> t = p._2()._1();
Optional<Double> a = p._2()._2();
if (t.isPresent() && a.isPresent()){
if (Math.abs(t.get() - a.get()) <= 0.1F){
return Boolean.TRUE;
}
else{
return Boolean.FALSE;
}
}
else{
return Boolean.FALSE;
}
});
res = avgOk.fold(true, (a,b) -> a && b);
}
}
if (res){
System.out.println(String.format("Test for Query %d PASSED",queryNum));
}
else{
System.out.println(String.format("Test for Query %d FAILED!!!",queryNum));
}
ssTest.stop();
}
public static class Conf implements Serializable {
String inPath,outPath,usersFName, moviesFName, ratingsFName;
int appNum;
String [] queryParams;
int q1Occupation,q1Rating;
int q2Occupation1,q2Occupation2;
int q3Occupation,q3Rating;
int q4Age;
boolean testMode;
public Conf(String inPath, String outPath, String usersFName, String moviesFName, String ratingsFName) {
this.inPath = inPath;
this.outPath = outPath;
this.usersFName = usersFName;
this.moviesFName = moviesFName;
this.ratingsFName = ratingsFName;
testMode = false;
}
public String getInPath() {
return inPath;
}
public void setInPath(String inPath) {
this.inPath = inPath;
}
public String getOutPath() {
return outPath;
}
public void setOutPath(String outPath) {
this.outPath = outPath;
}
public String getUserFName() {
return usersFName;
}
public void setUserFName(String userFName) {
this.usersFName = userFName;
}
public String getMoviesFName() {
return moviesFName;
}
public void setMoviesFName(String moviesFName) {
this.moviesFName = moviesFName;
}
public String getRatingsFName() {
return ratingsFName;
}
public void setRatingsFName(String ratingsFName) {
this.ratingsFName = ratingsFName;
}
}
}

View File

@ -0,0 +1,42 @@
package cs448;
public class CS448Utils {
public static String HDFS_URI = "hdfs://scholar-h000.rcac.purdue.edu:8020";
//Testing values for Q1
public static int TEST_q1Rating = 3;
public static int TEST_q1Occupation = 12;
//Testing values for Q2
public static int TEST_q2Occupation1 = 12;
public static int TEST_q2Occupation2 = 11;
//Testing values for Q3
public static int TEST_q3Occupation = 12;
public static int TEST_q3Rating = 3;
//Testing values for Q4
public static int TEST_q4Age = 18;
public static String resolveUri(String path){
StringBuilder sb = new StringBuilder();
sb.append(HDFS_URI);
sb.append('/');
sb.append(path);
return sb.toString();
}
public static String resolveUri(String dirPath, String filename){
StringBuilder sb = new StringBuilder();
sb.append(dirPath);
sb.append('/');
sb.append(filename);
return resolveUri(sb.toString());
}
public static String getTestUri(int i){
return resolveUri(String.format("/user/tqadah/test/query-%d",i));
}
}

View File

@ -0,0 +1,51 @@
package cs448;
import java.io.Serializable;
import java.util.List;
public class Movie implements Serializable {
private Integer movieId;
private String title;
private String[] genres;
public Movie(){}
public Movie(Integer movieId, String title, String[] genre) {
this.movieId = movieId;
this.title = title;
this.genres = genre;
}
public Integer getMovieId() {
return movieId;
}
public void setMovieId(Integer movieId) {
this.movieId = movieId;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String[] getGenres() {
return genres;
}
public void setGenres(String[] genres) {
this.genres = genres;
}
public static Movie parseMovie(String line){
String[] cols = line.split("::");
Movie m = new Movie();
m.setMovieId(Integer.parseInt(cols[0]));
m.setTitle(cols[1]);
m.setGenres( cols[2].split("\\|"));
return m;
}
}

View File

@ -0,0 +1,31 @@
package cs448;
public class Project4 {
public void runSparkApp1(App.Conf conf){
System.out.println("Running Your First Spark App!");
/* Hint: @see App#warmupExercise() for a sample Spark application code
* using SparkRDD API
*/
// Create a Spark Session.
// Write data processing code here
//Don't forget to stop spark session
}
public void runSparkApp2(App.Conf conf){
System.out.println("Running Spark App for Query 2");
// Write your code here
}
public void runSparkApp3(App.Conf conf){
System.out.println("Running Spark App for Query 3");
// Write your code here
}
public void runSparkApp4(App.Conf conf){
System.out.println("Running Spark App for Query 4");
// Write your code here
}
}

View File

@ -0,0 +1,61 @@
package cs448;
import java.io.Serializable;
public class Rating implements Serializable {
private Integer userId;
private Integer movieId;
private Integer rating;
private Long time;
public Rating(){}
public Rating(Integer userId, Integer movieId, Integer rating, Long time) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
this.time = time;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getMovieId() {
return movieId;
}
public void setMovieId(Integer movieId) {
this.movieId = movieId;
}
public Integer getRating() {
return rating;
}
public void setRating(Integer rating) {
this.rating = rating;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public static Rating parseRating(String line){
String[] cols = line.split("::");
Rating r = new Rating();
r.setUserId(Integer.parseInt(cols[0]));
r.setMovieId(Integer.parseInt(cols[1]));
r.setRating(Integer.parseInt(cols[2]));
r.setTime(Long.parseLong(cols[3]));
return r;
}
}

View File

@ -0,0 +1,73 @@
package cs448;
import java.io.Serializable;
public class User implements Serializable {
private Integer userId;
private String gender;
private Integer age;
private Integer occupation;
private String zipcode;
public User(){}
public User(Integer userId, String gender, Integer age, Integer occupation, String zipcode) {
this.userId = userId;
this.gender = gender;
this.age = age;
this.occupation = occupation;
this.zipcode = zipcode;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Integer getOccupation() {
return occupation;
}
public void setOccupation(Integer occupation) {
this.occupation = occupation;
}
public String getZipcode() {
return zipcode;
}
public void setZipcode(String zipcode) {
this.zipcode = zipcode;
}
public static User parseUser(String line){
String[] cols = line.split("::");
User u = new User();
u.setUserId(Integer.parseInt(cols[0]));
u.setGender(cols[1]);
u.setAge(Integer.parseInt(cols[2]));
u.setOccupation(Integer.parseInt(cols[3]));
u.setZipcode(cols[4]);
return u;
}
}