Skip to content

Commit

Permalink
destroy broadcasts; applyFeaturesStd
Browse files Browse the repository at this point in the history
  • Loading branch information
sethah committed Jul 5, 2017
1 parent a19b385 commit a51e565
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] {
* @param regParam The magnitude of the regularization.
* @param shouldApply A function (Int => Boolean) indicating whether a given index should have
* regularization applied to it.
* @param featuresStd Option for a function which maps coefficient index (column major) to the
* feature standard deviation. If `None`, no standardization is applied.
* @param applyFeaturesStd Option for a function which maps coefficient index (column major) to the
* feature standard deviation. If `None`, no standardization is applied.
*/
private[ml] class L2Regularization(
override val regParam: Double,
shouldApply: Int => Boolean,
featuresStd: Option[Int => Double]) extends DifferentiableRegularization[Vector] {
applyFeaturesStd: Option[Int => Double]) extends DifferentiableRegularization[Vector] {

override def calculate(coefficients: Vector): (Double, Vector) = {
coefficients match {
Expand All @@ -55,7 +55,7 @@ private[ml] class L2Regularization(
val gradient = new Array[Double](dv.size)
dv.values.indices.filter(shouldApply).foreach { j =>
val coef = coefficients(j)
featuresStd match {
applyFeaturesStd match {
case Some(getStd) =>
val std = getStd(j)
if (std != 0.0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.spark.ml.optim.aggregator

import scala.util.{Failure, Success, Try}

import org.apache.spark.SparkFunSuite
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.util.TestingUtils._
Expand Down Expand Up @@ -53,7 +56,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
private def getNewAggregator(
instances: Array[Instance],
coefficients: Vector,
fitIntercept: Boolean): LeastSquaresAggregator = {
fitIntercept: Boolean): (Seq[Broadcast[_]], LeastSquaresAggregator) = {
val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances)
val yStd = math.sqrt(ySummarizer.variance(0))
val yMean = ySummarizer.mean(0)
Expand All @@ -62,40 +65,53 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
val featuresMean = featuresSummarizer.mean
val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray)
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd,
bcFeaturesMean)(bcCoefficients)
val broadcasts = List(bcCoefficients, bcFeaturesMean, bcFeaturesStd)
val agg = Try(new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd,
bcFeaturesMean)(bcCoefficients)) match {
case Success(a) => a
case Failure(exception) =>
broadcasts.foreach(_.destroy(blocking = false))
throw exception
}
(broadcasts, agg)
}

test("aggregator add method input size") {
val coefficients = Vectors.dense(1.0, 2.0)
val agg = getNewAggregator(instances, coefficients, fitIntercept = true)
val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true)
withClue("LeastSquaresAggregator features dimension must match coefficients dimension") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, 1.0, Vectors.dense(2.0)))
}
}
broadcasts.foreach(_.destroy(blocking = false))
}

test("negative weight") {
val coefficients = Vectors.dense(1.0, 2.0)
val agg = getNewAggregator(instances, coefficients, fitIntercept = true)
val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true)
withClue("LeastSquaresAggregator does not support negative instance weights.") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0)))
}
}
broadcasts.foreach(_.destroy(blocking = false))
}

test("check sizes") {
val coefficients = Vectors.dense(1.0, 2.0)
val aggIntercept = getNewAggregator(instances, coefficients, fitIntercept = true)
val aggNoIntercept = getNewAggregator(instances, coefficients, fitIntercept = false)
val (broadcastsIntercept, aggIntercept) = getNewAggregator(instances, coefficients,
fitIntercept = true)
val (broadcastsNoIntercept, aggNoIntercept) = getNewAggregator(instances, coefficients,
fitIntercept = false)
instances.foreach(aggIntercept.add)
instances.foreach(aggNoIntercept.add)

// least squares agg does not include intercept in its gradient array
assert(aggIntercept.gradient.size === 2)
assert(aggNoIntercept.gradient.size === 2)
broadcastsIntercept.foreach(_.destroy(blocking = false))
broadcastsNoIntercept.foreach(_.destroy(blocking = false))
}

test("check correctness") {
Expand All @@ -111,7 +127,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
val yStd = math.sqrt(ySummarizer.variance(0))
val yMean = ySummarizer.mean(0)

val agg = getNewAggregator(instances, coefficients, fitIntercept = true)
val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true)
instances.foreach(agg.add)

// compute (y - pred) analytically
Expand Down Expand Up @@ -145,11 +161,12 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte

test("check with zero standard deviation") {
val coefficients = Vectors.dense(1.0, 2.0)
val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients,
fitIntercept = true)
val (broadcastsConstantFeature, aggConstantFeature) = getNewAggregator(instancesConstantFeature,
coefficients, fitIntercept = true)
instances.foreach(aggConstantFeature.add)
// constant features should not affect gradient
assert(aggConstantFeature.gradient(0) === 0.0)
broadcastsConstantFeature.foreach(_.destroy(blocking = false))

withClue("LeastSquaresAggregator does not support zero standard deviation of the label") {
intercept[IllegalArgumentException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.spark.ml.optim.aggregator

import scala.util.{Failure, Success, Try}

import org.apache.spark.SparkFunSuite
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors}
import org.apache.spark.ml.util.TestingUtils._
Expand All @@ -43,44 +46,54 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
)
}


/** Get summary statistics for some data and create a new LogisticAggregator. */
private def getNewAggregator(
instances: Array[Instance],
coefficients: Vector,
fitIntercept: Boolean,
isMultinomial: Boolean): LogisticAggregator = {
isMultinomial: Boolean): (Seq[Broadcast[_]], LogisticAggregator) = {
val (featuresSummarizer, ySummarizer) =
DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
val numClasses = ySummarizer.histogram.length
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients)
val broadcasts = List(bcFeaturesStd, bcCoefficients)
val x = Try(5).failed
val agg = Try(new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept,
isMultinomial)(bcCoefficients)) match {
case Success(a) => a
case Failure(exception) =>
broadcasts.foreach(_.destroy(blocking = false))
throw exception
}
(broadcasts, agg)
}

test("aggregator add method input size") {
val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0)
val interceptArray = Array(4.0, 2.0, -3.0)
val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
fitIntercept = true, isMultinomial = true)
withClue("LogisticAggregator features dimension must match coefficients dimension") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, 1.0, Vectors.dense(2.0)))
}
}
broadcasts.foreach(_.destroy(blocking = false))
}

test("negative weight") {
val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0)
val interceptArray = Array(4.0, 2.0, -3.0)
val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
fitIntercept = true, isMultinomial = true)
withClue("LogisticAggregator does not support negative instance weights") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0)))
}
}
broadcasts.foreach(_.destroy(blocking = false))
}

test("check sizes multinomial") {
Expand All @@ -91,15 +104,17 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
Array.fill(numClasses * (numFeatures + 1))(rng.nextDouble))
val coefWithoutIntercept = Vectors.dense(
Array.fill(numClasses * numFeatures)(rng.nextDouble))
val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true,
isMultinomial = true)
val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, fitIntercept = false,
isMultinomial = true)
val (broadcastIntercept, aggIntercept) = getNewAggregator(instances, coefWithIntercept,
fitIntercept = true, isMultinomial = true)
val (broadcastNoIntercept, aggNoIntercept) = getNewAggregator(instances, coefWithoutIntercept,
fitIntercept = false, isMultinomial = true)
instances.foreach(aggIntercept.add)
instances.foreach(aggNoIntercept.add)

assert(aggIntercept.gradient.size === (numFeatures + 1) * numClasses)
assert(aggNoIntercept.gradient.size === numFeatures * numClasses)
broadcastIntercept.foreach(_.destroy(blocking = false))
broadcastNoIntercept.foreach(_.destroy(blocking = false))
}

test("check sizes binomial") {
Expand All @@ -108,15 +123,17 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
val numFeatures = binaryInstances.head.features.size
val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble))
val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble))
val aggIntercept = getNewAggregator(binaryInstances, coefWithIntercept, fitIntercept = true,
isMultinomial = false)
val aggNoIntercept = getNewAggregator(binaryInstances, coefWithoutIntercept,
fitIntercept = false, isMultinomial = false)
val (broadcastIntercept, aggIntercept) = getNewAggregator(binaryInstances, coefWithIntercept,
fitIntercept = true, isMultinomial = false)
val (broadcastNoIntercept, aggNoIntercept) = getNewAggregator(binaryInstances,
coefWithoutIntercept, fitIntercept = false, isMultinomial = false)
binaryInstances.foreach(aggIntercept.add)
binaryInstances.foreach(aggNoIntercept.add)

assert(aggIntercept.gradient.size === numFeatures + 1)
assert(aggNoIntercept.gradient.size === numFeatures)
broadcastIntercept.foreach(_.destroy(blocking = false))
broadcastNoIntercept.foreach(_.destroy(blocking = false))
}

test("check correctness multinomial") {
Expand All @@ -133,7 +150,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val weightSum = instances.map(_.weight).sum

val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
fitIntercept = true, isMultinomial = true)
instances.foreach(agg.add)

Expand Down Expand Up @@ -182,6 +199,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {

assert(loss ~== agg.loss relTol 0.01)
assert(gradient ~== agg.gradient relTol 0.01)
broadcasts.foreach(_.destroy(blocking = false))
}

test("check correctness binomial") {
Expand All @@ -199,8 +217,8 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val weightSum = binaryInstances.map(_.weight).sum

val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)),
fitIntercept = true, isMultinomial = false)
val (broadcasts, agg) = getNewAggregator(binaryInstances,
Vectors.dense(coefArray ++ Array(intercept)), fitIntercept = true, isMultinomial = false)
binaryInstances.foreach(agg.add)

// compute the loss
Expand Down Expand Up @@ -228,6 +246,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {

assert(loss ~== agg.loss relTol 0.01)
assert(gradient ~== agg.gradient relTol 0.01)
broadcasts.foreach(_.destroy(blocking = false))
}

test("check with zero standard deviation") {
Expand All @@ -236,19 +255,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
}
val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0)
val interceptArray = Array(4.0, 2.0, -3.0)
val aggConstantFeature = getNewAggregator(instancesConstantFeature,
val (broadcastConstantFeature, aggConstantFeature) = getNewAggregator(instancesConstantFeature,
Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true)
instances.foreach(aggConstantFeature.add)
// constant features should not affect gradient
assert(aggConstantFeature.gradient(0) === 0.0)
broadcastConstantFeature.foreach(_.destroy(blocking = false))

val binaryCoefArray = Array(1.0, 2.0)
val intercept = 1.0
val aggConstantFeatureBinary = getNewAggregator(binaryInstances,
val (broadcastConstantBinary, aggConstantFeatureBinary) = getNewAggregator(binaryInstances,
Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true,
isMultinomial = false)
instances.foreach(aggConstantFeatureBinary.add)
// constant features should not affect gradient
assert(aggConstantFeatureBinary.gradient(0) === 0.0)
broadcastConstantBinary.foreach(_.destroy(blocking = false))
}
}

0 comments on commit a51e565

Please sign in to comment.