From b67d2072e71eab5204d2406e4a286f0b7be5d9a0 Mon Sep 17 00:00:00 2001 From: vinceshieh Date: Wed, 26 Jul 2017 11:20:54 +0800 Subject: [PATCH 1/3] [SparkBench/ML] rework LogisticRegression to support aggregateion depth setting In this PR, we made changes to both data generator and LR model to better support computation resource utilization Signed-off-by: vinceshieh --- bin/functions/hibench_prop_env_mapping.py | 1 + bin/workloads/ml/lr/spark/run.sh | 2 +- conf/workloads/ml/lr.conf | 3 +- .../sparkbench/ml/LogisticRegression.scala | 53 +++++++----------- .../ml/LogisticRegressionDataGenerator.scala | 56 ++++++++----------- 5 files changed, 47 insertions(+), 68 deletions(-) diff --git a/bin/functions/hibench_prop_env_mapping.py b/bin/functions/hibench_prop_env_mapping.py index ad0b21acf..4799dbc6d 100644 --- a/bin/functions/hibench_prop_env_mapping.py +++ b/bin/functions/hibench_prop_env_mapping.py @@ -93,6 +93,7 @@ # For Logistic Regression NUM_EXAMPLES_LR="hibench.lr.examples", NUM_FEATURES_LR="hibench.lr.features", + AGGREGATION_DEPTH_LR="hibench.lr.agg.depth", # For ALS NUM_USERS="hibench.als.users", NUM_PRODUCTS="hibench.als.products", diff --git a/bin/workloads/ml/lr/spark/run.sh b/bin/workloads/ml/lr/spark/run.sh index 54ed563e4..5fda8e355 100755 --- a/bin/workloads/ml/lr/spark/run.sh +++ b/bin/workloads/ml/lr/spark/run.sh @@ -26,7 +26,7 @@ rmr_hdfs $OUTPUT_HDFS || true SIZE=`dir_size $INPUT_HDFS` START_TIME=`timestamp` -run_spark_job com.intel.hibench.sparkbench.ml.LogisticRegression ${INPUT_HDFS} +run_spark_job com.intel.hibench.sparkbench.ml.LogisticRegression ${INPUT_HDFS} $NUM_FEATURES_LR $AGGREGATION_DEPTH_LR END_TIME=`timestamp` gen_report ${START_TIME} ${END_TIME} ${SIZE} diff --git a/conf/workloads/ml/lr.conf b/conf/workloads/ml/lr.conf index 9a05e6614..b32a476de 100644 --- a/conf/workloads/ml/lr.conf +++ b/conf/workloads/ml/lr.conf @@ -11,10 +11,11 @@ hibench.lr.gigantic.features 500000 hibench.lr.bigdata.examples 10000 hibench.lr.bigdata.features 700000 - +hibench.lr.agg.depth 2 hibench.lr.examples ${hibench.lr.${hibench.scale.profile}.examples} hibench.lr.features ${hibench.lr.${hibench.scale.profile}.features} hibench.lr.partitions ${hibench.default.map.parallelism} hibench.workload.input ${hibench.hdfs.data.dir}/LR/Input hibench.workload.output ${hibench.hdfs.data.dir}/LR/Output +spark.rpc.message.maxSize 2000 diff --git a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala index 673ed2fee..c16b7c29b 100644 --- a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala +++ b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala @@ -20,47 +20,34 @@ package com.intel.hibench.sparkbench.ml import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD +import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.sql.SparkSession object LogisticRegression { def main(args: Array[String]): Unit = { var inputPath = "" - - if (args.length == 1) { + var numFeatures = 0 + var aggDepth = 0 + if (args.length == 3) { inputPath = args(0) + numFeatures = args(1).toInt + aggDepth = if (args(2).toInt < 2) 2 else args(2).toInt } - - val conf = new SparkConf() - .setAppName("LogisticRegressionWithLBFGS") - val sc = new SparkContext(conf) - - // $example on$ - // Load training data in LIBSVM format. - val data: RDD[LabeledPoint] = sc.objectFile(inputPath) - - // Split data into training (60%) and test (40%). - val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) - val training = splits(0).cache() - val test = splits(1) - + val spark = SparkSession.builder.appName("LogisticRegression").getOrCreate() + val df = spark.read.format("libsvm") + .option("numFeatures", numFeatures) + .load(inputPath) // Run training algorithm to build the model - val model = new LogisticRegressionWithLBFGS() - .setNumClasses(10) - .run(training) - - // Compute raw scores on the test set. - val predictionAndLabels = test.map { case LabeledPoint(label, features) => - val prediction = model.predict(features) - (prediction, label) - } - - val accuracy = predictionAndLabels.filter(x => x._1 == x._2).count().toDouble / predictionAndLabels.count() - println(s"Accuracy = $accuracy") - - sc.stop() + val model = new LogisticRegression() + .setMaxIter(30) + .setRegParam(0.3) + .setElasticNetParam(0.8) + .setAggregationDepth(aggDepth) + .fit(df) + println(s"training complete!") + spark.stop() } } // scalastyle:on println diff --git a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala index f9d408fa0..f002b7040 100644 --- a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala +++ b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala @@ -23,9 +23,8 @@ import scala.util.Random import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Since} -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.sql.SparkSession /** * :: DeveloperApi :: @@ -33,40 +32,36 @@ import org.apache.spark.rdd.RDD * with probability `probOne` and scales features for positive examples by `eps`. */ object LogisticRegressionDataGenerator { - /** - * Generate an RDD containing test data for LogisticRegression. + * Generate an DataFrame containing test data for LogisticRegression. * - * @param sc SparkContext to use for creating the RDD. - * @param nexamples Number of examples that will be contained in the RDD. + * @param nexamples Number of examples that will be contained in the data. * @param nfeatures Number of features to generate for each example. * @param eps Epsilon factor by which positive examples are scaled. - * @param nparts Number of partitions of the generated RDD. Default value is 2. + * @param nparts Number of partitions of the generated DataFrame. Default value is 2. * @param probOne Probability that a label is 1 (and not 0). Default value is 0.5. */ - def generateLogisticRDD( - sc: SparkContext, + + val spark = SparkSession.builder.appName("LogisticRegressionDataGenerator").getOrCreate() + val sc = spark.sparkContext + def generateLogisticDF( nexamples: Int, nfeatures: Int, eps: Double, nparts: Int = 2, - probOne: Double = 0.5): RDD[LabeledPoint] = { - val data = sc.parallelize(0 until nexamples, nparts).map { idx => - val rnd = new Random(42 + idx) - - val y = if (idx % 2 == 0) 0.0 else 1.0 - val x = Array.fill[Double](nfeatures) { - rnd.nextGaussian() + (y * eps) - } - LabeledPoint(y, Vectors.dense(x)) + probOne: Double = 0.5) = { + val data: Seq[(Double, org.apache.spark.ml.linalg.Vector)] = Seq.tabulate(nexamples)(idx => { + val rnd = new Random(42 + idx) + val y = if (idx % 2 == 0) 0.0 else 1.0 + val x = Array.fill[Double](nfeatures) { + rnd.nextGaussian() + (y * eps) + } + (y, Vectors.dense(x)) + }) + spark.createDataFrame(data).toDF("label","features").repartition(nparts) } - data - } def main(args: Array[String]) { - val conf = new SparkConf().setAppName("LogisticRegressionDataGenerator") - val sc = new SparkContext(conf) - var outputPath = "" var numExamples: Int = 200000 var numFeatures: Int = 20 @@ -79,20 +74,15 @@ object LogisticRegressionDataGenerator { outputPath = args(0) numExamples = args(1).toInt numFeatures = args(2).toInt - println(s"Output Path: $outputPath") - println(s"Num of Examples: $numExamples") - println(s"Num of Features: $numFeatures") + println(s"Output Path: $outputPath, Num of Examples: $numExamples, Num of Features: $numFeatures") } else { System.err.println( s"Usage: $LogisticRegressionDataGenerator " ) System.exit(1) } - - val data = generateLogisticRDD(sc, numExamples, numFeatures, eps, numPartitions) - - data.saveAsObjectFile(outputPath) - - sc.stop() + val df = generateLogisticDF(numExamples, numFeatures, eps, numPartitions) + df.write.format("libsvm").save(outputPath) + spark.stop() } } From cae07411ca36ce5f8b1c4c6a60a1f39f9bd127a8 Mon Sep 17 00:00:00 2001 From: vinceshieh Date: Wed, 26 Jul 2017 22:10:50 +0800 Subject: [PATCH 2/3] [SparkBench/ML] fix OOM issue for data gen with bigdata scale Signed-off-by: vinceshieh --- conf/workloads/ml/lr.conf | 1 + .../ml/LogisticRegressionDataGenerator.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/conf/workloads/ml/lr.conf b/conf/workloads/ml/lr.conf index b32a476de..991d5b8d2 100644 --- a/conf/workloads/ml/lr.conf +++ b/conf/workloads/ml/lr.conf @@ -19,3 +19,4 @@ hibench.lr.partitions ${hibench.default.map.parallelism} hibench.workload.input ${hibench.hdfs.data.dir}/LR/Input hibench.workload.output ${hibench.hdfs.data.dir}/LR/Output spark.rpc.message.maxSize 2000 +spark.driver.maxResultSize 4g diff --git a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala index f002b7040..f483f5477 100644 --- a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala +++ b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala @@ -21,9 +21,9 @@ import com.intel.hibench.sparkbench.common.IOCommon import scala.util.Random -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.sql.SparkSession /** @@ -50,15 +50,15 @@ object LogisticRegressionDataGenerator { eps: Double, nparts: Int = 2, probOne: Double = 0.5) = { - val data: Seq[(Double, org.apache.spark.ml.linalg.Vector)] = Seq.tabulate(nexamples)(idx => { + val data = sc.parallelize(0 until nexamples, nparts).map { idx => val rnd = new Random(42 + idx) val y = if (idx % 2 == 0) 0.0 else 1.0 - val x = Array.fill[Double](nfeatures) { - rnd.nextGaussian() + (y * eps) + val x = Array.fill[Double](nfeatures) { + rnd.nextGaussian() + (y * eps) } - (y, Vectors.dense(x)) - }) - spark.createDataFrame(data).toDF("label","features").repartition(nparts) + LabeledPoint(y, Vectors.dense(x)) + } + spark.createDataFrame(data) } def main(args: Array[String]) { From 32af5047ca3875cba4ae9bdb5302a8138cdd938e Mon Sep 17 00:00:00 2001 From: vinceshieh Date: Fri, 11 Aug 2017 14:22:28 +0800 Subject: [PATCH 3/3] [SparkBench/ML] fix compilation issue for spark2.0 setAggregationDepth is only supported in spark2.1 and above Signed-off-by: vinceshieh --- .../scala/com/intel/sparkbench/ml/LogisticRegression.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala index c16b7c29b..910613e80 100644 --- a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala +++ b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala @@ -44,7 +44,8 @@ object LogisticRegression { .setMaxIter(30) .setRegParam(0.3) .setElasticNetParam(0.8) - .setAggregationDepth(aggDepth) +// uncomment below config for spark 2.1 or above +// .setAggregationDepth(aggDepth) .fit(df) println(s"training complete!") spark.stop()