From 88eb4e2edd8b8c8c66c6d16544032586b1820975 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Fri, 30 Jan 2015 19:46:54 +0100 Subject: [PATCH] SPARK-3278 changes after PR comments https://github.com/apache/spark/pull/3519. Isotonic parameter removed from algorithm, defined behaviour for multiple data points with the same feature value, added tests to verify it --- .../mllib/regression/IsotonicRegression.scala | 87 ++++++++++--------- .../regression/IsotonicRegressionSuite.scala | 24 +++++ 2 files changed, 70 insertions(+), 41 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 9b67807b92f02..4fd7904d026c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -30,11 +30,12 @@ import org.apache.spark.rdd.RDD * @param boundaries Array of boundaries for which predictions are known. * Boundaries must be sorted in increasing order. * @param predictions Array of predictions associated to the boundaries at the same index. - * Result of isotonic regression and therefore is monotone. + * Results of isotonic regression and therefore monotone. */ class IsotonicRegressionModel ( boundaries: Array[Double], - val predictions: Array[Double]) + val predictions: Array[Double], + isotonic: Boolean) extends Serializable { private def isSorted(xs: Array[Double]): Boolean = { @@ -46,6 +47,12 @@ class IsotonicRegressionModel ( true } + if (isotonic) { + assert(isSorted(predictions)) + } else { + assert(isSorted(predictions.map(-_))) + } + assert(isSorted(boundaries)) assert(boundaries.length == predictions.length) @@ -77,11 +84,15 @@ class IsotonicRegressionModel ( * * @param testData Feature to be labeled. * @return Predicted label. - * If testData exactly matches a boundary then associated prediction is directly returned. - * If testData is lower or higher than all boundaries. - * then first or last prediction is returned respectively. - * If testData falls between two values in boundary array then predictions is treated - * as piecewise linear function and interpolated value is returned. + * 1) If testData exactly matches a boundary then associated prediction is returned. + * In case there are multiple predictions with the same boundary then one of them + * is returned. Which one is undefined (same as java.util.Arrays.binarySearch). + * 2) If testData is lower or higher than all boundaries then first or last prediction + * is returned respectively. In case there are multiple predictions with the same + * boundary then the lowest or highest is returned respectively. + * 3) If testData falls between two values in boundary array then prediction is treated + * as piecewise linear function and interpolated value is returned. In case there are + * multiple values with the same boundary then the same rules as in 2) are used. */ def predict(testData: Double): Double = { @@ -131,12 +142,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali /** * Constructs IsotonicRegression instance with default parameter isotonic = true. + * * @return New instance of IsotonicRegression. */ def this() = this(true) /** * Sets the isotonic parameter. + * * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. * @return This instance of IsotonicRegression. */ @@ -151,10 +164,23 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * @param input RDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. + * If multiple labels share the same feature value then they are ordered before + * the algorithm is executed. * @return Isotonic regression model. */ def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = { - createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic) + val preprocessedInput = if (isotonic) { + input + } else { + input.map(x => (-x._1, x._2, x._3)) + } + + val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput) + + val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1) + val boundaries = isotonicRegression.map(_._2) + + new IsotonicRegressionModel(boundaries, predictions, isotonic) } /** @@ -163,28 +189,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. - * + * If multiple labels share the same feature value then they are ordered before + * the algorithm is executed. * @return Isotonic regression model. */ - def run( - input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = { + def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = { run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]]) } - /** - * Creates isotonic regression model with given parameters. - * - * @param predictions Predictions calculated using pool adjacent violators algorithm. - * Used for predictions on new data points. - * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. - * @return Isotonic regression model. - */ - protected def createModel( - predictions: Array[(Double, Double, Double)], - isotonic: Boolean): IsotonicRegressionModel = { - new IsotonicRegressionModel(predictions.map(_._2), predictions.map(_._1)) - } - /** * Performs a pool adjacent violators algorithm (PAV). * Uses approach with single processing of data where violators @@ -192,13 +204,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * Uses optimization of discovering monotonicity violating sequences (blocks). * * @param input Input data of tuples (label, feature, weight). - * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. * @return Result tuples (label, feature, weight) where labels were updated * to form a monotone sequence as per isotonic regression definition. */ private def poolAdjacentViolators( - input: Array[(Double, Double, Double)], - isotonic: Boolean): Array[(Double, Double, Double)] = { + input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { // Pools sub array within given bounds assigning weighted average value to all elements. def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { @@ -214,15 +224,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } } - val monotonicityConstraintHolds: (Double, Double) => Boolean = - (x, y) => if (isotonic) x <= y else x >= y - var i = 0 while (i < input.length) { var j = i // Find monotonicity violating sequence, if any. - while (j < input.length - 1 && !monotonicityConstraintHolds(input(j)._1, input(j + 1)._1)) { + while (j < input.length - 1 && input(j)._1 > input(j + 1)._1) { j = j + 1 } @@ -232,7 +239,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } else { // Otherwise pool the violating sequence // and check if pooling caused monotonicity violation in previously processed points. - while (i >= 0 && !monotonicityConstraintHolds(input(i)._1, input(i + 1)._1)) { + while (i >= 0 && input(i)._1 > input(i + 1)._1) { pool(input, i, j) i = i - 1 } @@ -248,19 +255,17 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * Performs parallel pool adjacent violators algorithm. * Performs Pool adjacent violators algorithm on each partition and then again on the result. * - * @param testData Input data of tuples (label, feature, weight). - * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @param input Input data of tuples (label, feature, weight). * @return Result tuples (label, feature, weight) where labels were updated * to form a monotone sequence as per isotonic regression definition. */ private def parallelPoolAdjacentViolators( - testData: RDD[(Double, Double, Double)], - isotonic: Boolean): Array[(Double, Double, Double)] = { + input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - val parallelStepResult = testData - .sortBy(_._2) - .mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator) + val parallelStepResult = input + .sortBy(x => (x._2, x._1)) + .mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator) - poolAdjacentViolators(parallelStepResult.collect(), isotonic) + poolAdjacentViolators(parallelStepResult.collect()) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 24dca73d42cb4..962e52536da66 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -144,6 +144,30 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M assert(model.predict(10) === 10d/3) } + test("isotonic regression prediction with duplicate features") { + val trainRDD = sc.parallelize( + Seq[(Double, Double, Double)]( + (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache() + val model = new IsotonicRegression().run(trainRDD) + + assert(model.predict(0) === 1) + assert(model.predict(1.5) === 2) + assert(model.predict(2.5) === 4.5) + assert(model.predict(4) === 6) + } + + test("antitonic regression prediction with duplicate features") { + val trainRDD = sc.parallelize( + Seq[(Double, Double, Double)]( + (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache() + val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) + + assert(model.predict(0) === 6) + assert(model.predict(1.5) === 4.5) + assert(model.predict(2.5) === 2) + assert(model.predict(4) === 1) + } + test("isotonic regression RDD prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()