Skip to content

Commit

Permalink
SPARK-3278 changes after PR comments apache#3519. Isotonic parameter …
Browse files Browse the repository at this point in the history
…removed from algorithm, defined behaviour for multiple data points with the same feature value, added tests to verify it
  • Loading branch information
zapletal-martin committed Jan 30, 2015
1 parent e60a34f commit 88eb4e2
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import org.apache.spark.rdd.RDD
* @param boundaries Array of boundaries for which predictions are known.
* Boundaries must be sorted in increasing order.
* @param predictions Array of predictions associated to the boundaries at the same index.
* Result of isotonic regression and therefore is monotone.
* Results of isotonic regression and therefore monotone.
*/
class IsotonicRegressionModel (
boundaries: Array[Double],
val predictions: Array[Double])
val predictions: Array[Double],
isotonic: Boolean)
extends Serializable {

private def isSorted(xs: Array[Double]): Boolean = {
Expand All @@ -46,6 +47,12 @@ class IsotonicRegressionModel (
true
}

if (isotonic) {
assert(isSorted(predictions))
} else {
assert(isSorted(predictions.map(-_)))
}

assert(isSorted(boundaries))
assert(boundaries.length == predictions.length)

Expand Down Expand Up @@ -77,11 +84,15 @@ class IsotonicRegressionModel (
*
* @param testData Feature to be labeled.
* @return Predicted label.
* If testData exactly matches a boundary then associated prediction is directly returned.
* If testData is lower or higher than all boundaries.
* then first or last prediction is returned respectively.
* If testData falls between two values in boundary array then predictions is treated
* as piecewise linear function and interpolated value is returned.
* 1) If testData exactly matches a boundary then associated prediction is returned.
* In case there are multiple predictions with the same boundary then one of them
* is returned. Which one is undefined (same as java.util.Arrays.binarySearch).
* 2) If testData is lower or higher than all boundaries then first or last prediction
* is returned respectively. In case there are multiple predictions with the same
* boundary then the lowest or highest is returned respectively.
* 3) If testData falls between two values in boundary array then prediction is treated
* as piecewise linear function and interpolated value is returned. In case there are
* multiple values with the same boundary then the same rules as in 2) are used.
*/
def predict(testData: Double): Double = {

Expand Down Expand Up @@ -131,12 +142,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali

/**
* Constructs IsotonicRegression instance with default parameter isotonic = true.
*
* @return New instance of IsotonicRegression.
*/
def this() = this(true)

/**
* Sets the isotonic parameter.
*
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return This instance of IsotonicRegression.
*/
Expand All @@ -151,10 +164,23 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
* If multiple labels share the same feature value then they are ordered before
* the algorithm is executed.
* @return Isotonic regression model.
*/
def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
val preprocessedInput = if (isotonic) {
input
} else {
input.map(x => (-x._1, x._2, x._3))
}

val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput)

val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1)
val boundaries = isotonicRegression.map(_._2)

new IsotonicRegressionModel(boundaries, predictions, isotonic)
}

/**
Expand All @@ -163,42 +189,26 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
*
* If multiple labels share the same feature value then they are ordered before
* the algorithm is executed.
* @return Isotonic regression model.
*/
def run(
input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]])
}

/**
* Creates isotonic regression model with given parameters.
*
* @param predictions Predictions calculated using pool adjacent violators algorithm.
* Used for predictions on new data points.
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Isotonic regression model.
*/
protected def createModel(
predictions: Array[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions.map(_._2), predictions.map(_._1))
}

/**
* Performs a pool adjacent violators algorithm (PAV).
* Uses approach with single processing of data where violators
* in previously processed data created by pooling are fixed immediately.
* Uses optimization of discovering monotonicity violating sequences (blocks).
*
* @param input Input data of tuples (label, feature, weight).
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)],
isotonic: Boolean): Array[(Double, Double, Double)] = {
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
Expand All @@ -214,15 +224,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
}
}

val monotonicityConstraintHolds: (Double, Double) => Boolean =
(x, y) => if (isotonic) x <= y else x >= y

var i = 0
while (i < input.length) {
var j = i

// Find monotonicity violating sequence, if any.
while (j < input.length - 1 && !monotonicityConstraintHolds(input(j)._1, input(j + 1)._1)) {
while (j < input.length - 1 && input(j)._1 > input(j + 1)._1) {
j = j + 1
}

Expand All @@ -232,7 +239,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
} else {
// Otherwise pool the violating sequence
// and check if pooling caused monotonicity violation in previously processed points.
while (i >= 0 && !monotonicityConstraintHolds(input(i)._1, input(i + 1)._1)) {
while (i >= 0 && input(i)._1 > input(i + 1)._1) {
pool(input, i, j)
i = i - 1
}
Expand All @@ -248,19 +255,17 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* Performs parallel pool adjacent violators algorithm.
* Performs Pool adjacent violators algorithm on each partition and then again on the result.
*
* @param testData Input data of tuples (label, feature, weight).
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @param input Input data of tuples (label, feature, weight).
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def parallelPoolAdjacentViolators(
testData: RDD[(Double, Double, Double)],
isotonic: Boolean): Array[(Double, Double, Double)] = {
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val parallelStepResult = testData
.sortBy(_._2)
.mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator)
val parallelStepResult = input
.sortBy(x => (x._2, x._1))
.mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)

poolAdjacentViolators(parallelStepResult.collect(), isotonic)
poolAdjacentViolators(parallelStepResult.collect())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
assert(model.predict(10) === 10d/3)
}

test("isotonic regression prediction with duplicate features") {
val trainRDD = sc.parallelize(
Seq[(Double, Double, Double)](
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache()
val model = new IsotonicRegression().run(trainRDD)

assert(model.predict(0) === 1)
assert(model.predict(1.5) === 2)
assert(model.predict(2.5) === 4.5)
assert(model.predict(4) === 6)
}

test("antitonic regression prediction with duplicate features") {
val trainRDD = sc.parallelize(
Seq[(Double, Double, Double)](
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache()
val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)

assert(model.predict(0) === 6)
assert(model.predict(1.5) === 4.5)
assert(model.predict(2.5) === 2)
assert(model.predict(4) === 1)
}

test("isotonic regression RDD prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()
Expand Down

0 comments on commit 88eb4e2

Please sign in to comment.