From fad4bf90987fa99e04d309bd5307d661a511de97 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Wed, 21 Jan 2015 23:58:59 +0000 Subject: [PATCH] SPARK-3278 changes after PR comments https://github.com/apache/spark/pull/3519 --- .../mllib/regression/IsotonicRegression.scala | 70 +++++++----------- .../JavaIsotonicRegressionSuite.java | 16 ++-- .../regression/IsotonicRegressionSuite.scala | 74 +++++++++++-------- .../mllib/util/IsotonicDataGenerator.scala | 6 +- 4 files changed, 82 insertions(+), 84 deletions(-) rename mllib/src/{main => test}/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala (99%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 90c1f5a7347bc..b2f2d0c3ce39a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -17,7 +17,9 @@ package org.apache.spark.mllib.regression -import org.apache.spark.api.java.{JavaRDD, JavaPairRDD} +import java.io.Serializable + +import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} import org.apache.spark.rdd.RDD /** @@ -46,8 +48,8 @@ class IsotonicRegressionModel ( * @param testData features to be labeled * @return predicted labels */ - def predict(testData: JavaRDD[java.lang.Double]): JavaRDD[java.lang.Double] = - testData.rdd.map(_.doubleValue()).map(predict).map(new java.lang.Double(_)) + def predict(testData: JavaRDD[java.lang.Double]): JavaDoubleRDD = + JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]])) /** * Predict a single label @@ -61,23 +63,12 @@ class IsotonicRegressionModel ( } /** - * Base representing algorithm for isotonic regression + * Isotonic regression + * Currently implemented using oarallel pool adjacent violators algorithm for monotone regression */ -trait IsotonicRegressionAlgorithm +class IsotonicRegression extends Serializable { - /** - * Creates isotonic regression model with given parameters - * - * @param predictions labels estimated using isotonic regression algorithm. - * Used for predictions on new data points. - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence - * @return isotonic regression model - */ - protected def createModel( - predictions: Seq[(Double, Double, Double)], - isotonic: Boolean): IsotonicRegressionModel - /** * Run algorithm to obtain isotonic regression model * @@ -86,17 +77,6 @@ trait IsotonicRegressionAlgorithm * @return isotonic regression model */ def run( - input: RDD[(Double, Double, Double)], - isotonic: Boolean): IsotonicRegressionModel -} - -/** - * Parallel pool adjacent violators algorithm for monotone regression - */ -class PoolAdjacentViolators private [mllib] - extends IsotonicRegressionAlgorithm { - - override def run( input: RDD[(Double, Double, Double)], isotonic: Boolean = true): IsotonicRegressionModel = { createModel( @@ -104,7 +84,15 @@ class PoolAdjacentViolators private [mllib] isotonic) } - override protected def createModel( + /** + * Creates isotonic regression model with given parameters + * + * @param predictions labels estimated using isotonic regression algorithm. + * Used for predictions on new data points. + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence + * @return isotonic regression model + */ + protected def createModel( predictions: Seq[(Double, Double, Double)], isotonic: Boolean): IsotonicRegressionModel = { new IsotonicRegressionModel(predictions, isotonic) @@ -132,31 +120,27 @@ class PoolAdjacentViolators private [mllib] val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum val weight = poolSubArray.map(_._3).sum - for(i <- start to end) { + var i = start + while (i <= end) { in(i) = (weightedSum / weight, in(i)._2, in(i)._3) + i = i + 1 } } - val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y - val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y - - def monotonicityConstraint(isotonic: Boolean) = - if(isotonic) isotonicConstraint else antitonicConstraint - - val monotonicityConstraintHolds = monotonicityConstraint(isotonic) + val monotonicityConstraintHolds: (Double, Double) => Boolean = + (x, y) => if (isotonic) x <= y else x >= y var i = 0 - - while(i < in.length) { + while (i < in.length) { var j = i // Find monotonicity violating sequence, if any - while(j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) { + while (j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) { j = j + 1 } // If monotonicity was not violated, move to next data point - if(i == j) { + if (i == j) { i = i + 1 } else { // Otherwise pool the violating sequence @@ -212,7 +196,7 @@ object IsotonicRegression { def train( input: RDD[(Double, Double, Double)], isotonic: Boolean = true): IsotonicRegressionModel = { - new PoolAdjacentViolators().run(input, isotonic) + new IsotonicRegression().run(input, isotonic) } /** @@ -227,7 +211,7 @@ object IsotonicRegression { def train( input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)], isotonic: Boolean): IsotonicRegressionModel = { - new PoolAdjacentViolators() + new IsotonicRegression() .run( input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())), isotonic) diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index e9ea4fb6f24eb..de91a0fa3b3f5 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -17,18 +17,20 @@ package org.apache.spark.mllib.regression; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.util.IsotonicDataGenerator; +import java.io.Serializable; +import java.util.List; + +import scala.Tuple3; + import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import scala.Tuple3; -import java.io.Serializable; -import java.util.List; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.util.IsotonicDataGenerator; public class JavaIsotonicRegressionSuite implements Serializable { private transient JavaSparkContext sc; diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index fd48755875e03..510528a57309e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.scalatest.{Matchers, FunSuite} -import scala.util.Random + +import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.apache.spark.mllib.util.IsotonicDataGenerator._ class IsotonicRegressionSuite @@ -32,26 +31,34 @@ class IsotonicRegressionSuite Math.round(d * 100).toDouble / 100 test("increasing isotonic regression") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + val trainRDD = sc.parallelize( + generateIsotonicInput( + 1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + model.predictions should be( + generateIsotonicInput( + 1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } test("increasing isotonic regression using api") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + val trainRDD = sc.parallelize( + generateIsotonicInput( + 1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() val model = IsotonicRegression.train(trainRDD, true) - model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + model.predictions should be( + generateIsotonicInput( + 1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } test("isotonic regression with size 0") { val trainRDD = sc.parallelize(List[(Double, Double, Double)]()).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(List()) @@ -60,7 +67,7 @@ class IsotonicRegressionSuite test("isotonic regression with size 1") { val trainRDD = sc.parallelize(generateIsotonicInput(1)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1)) @@ -69,7 +76,7 @@ class IsotonicRegressionSuite test("isotonic regression strictly increasing sequence") { val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) @@ -78,7 +85,7 @@ class IsotonicRegressionSuite test("isotonic regression strictly decreasing sequence") { val trainRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3)) @@ -87,7 +94,7 @@ class IsotonicRegressionSuite test("isotonic regression with last element violating monotonicity") { val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3)) @@ -96,7 +103,7 @@ class IsotonicRegressionSuite test("isotonic regression with first element violating monotonicity") { val trainRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5)) @@ -105,7 +112,7 @@ class IsotonicRegressionSuite test("isotonic regression with negative labels") { val trainRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0)) @@ -114,45 +121,48 @@ class IsotonicRegressionSuite test("isotonic regression with unordered input") { val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { - val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() + val trainRDD = sc.parallelize( + generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2))) + model.predictions should be( + generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2))) } test("weighted isotonic regression with weights lower than 1") { - val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() + val trainRDD = sc.parallelize( + generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions.map(p => p.copy(_1 = round(p._1))) should be - (generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1))) + model.predictions.map(p => p.copy(_1 = round(p._1))) should be( + generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1))) } test("weighted isotonic regression with negative weights") { val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions.map(p => p.copy(_1 = round(p._1))) should be - (generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5))) + model.predictions should be( + generateWeightedIsotonicInput(Seq(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6), Seq(-1, 1, -3, 1, -5))) } test("weighted isotonic regression with zero weights") { val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0))) @@ -161,7 +171,7 @@ class IsotonicRegressionSuite test("isotonic regression prediction") { val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) model.predict(0) should be(1) @@ -172,18 +182,18 @@ class IsotonicRegressionSuite test("isotonic regression RDD prediction") { val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() - val testRDD = sc.parallelize(List(0d, 2d, 3d, 10d)).cache() + val testRDD = sc.parallelize(List(0.0, 2.0, 3.0, 10.0)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predict(testRDD).collect() should be(Array(1, 2, 10d/3, 10d/3)) + model.predict(testRDD).collect() should be(Array(1, 2, 10.0/3, 10.0/3)) } test("antitonic regression prediction") { val trainRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache() - val alg = new PoolAdjacentViolators + val alg = new IsotonicRegression val model = alg.run(trainRDD, false) model.predict(0) should be(7) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala similarity index 99% rename from mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala rename to mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala index ecd5b3cd6259b..2aaa5b72b97d2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala @@ -17,10 +17,12 @@ package org.apache.spark.mllib.util -import org.apache.spark.annotation.DeveloperApi -import scala.collection.JavaConversions._ import java.lang.{Double => JDouble} +import scala.collection.JavaConversions._ + +import org.apache.spark.annotation.DeveloperApi + /** * :: DeveloperApi :: * Generate test data for Isotonic regresision.