Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20988][ML] Logistic regression uses aggregator hierarchy #18305

Closed
wants to merge 7 commits into from

Conversation

sethah
Copy link
Contributor

@sethah sethah commented Jun 14, 2017

What changes were proposed in this pull request?

This change pulls the LogisticAggregator class out of LogisticRegression.scala and makes it extend DifferentiableLossAggregator. It also changes logistic regression to use the generic RDDLossFunction instead of having its own.

Other minor changes:

  • L2Regularization accepts Option[Int => Double] for features standard deviation
  • L2Regularization uses Vector type instead of Array
  • Some tests added to LeastSquaresAggregator

How was this patch tested?

Unit test suites are added.

@sethah
Copy link
Contributor Author

sethah commented Jun 14, 2017

cc @VinceShieh @MLnick @srowen

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78063 has finished for PR 18305 at commit 6edd128.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78065 has finished for PR 18305 at commit fcf5372.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sethah
Copy link
Contributor Author

sethah commented Jun 23, 2017

also ping @hhbyyh @yanboliang This is a straightforward follow up to #17094. Let me know if I can do anything to make the review easier.

@yanboliang
Copy link
Contributor

yanboliang commented Jun 26, 2017

@sethah I will take a look in a few days after handling some backlog, thanks for your patience.

@@ -50,7 +50,7 @@ private[ml] class RDDLossFunction[
Agg <: DifferentiableLossAggregator[T, Agg]: ClassTag](
instances: RDD[T],
getAggregator: (Broadcast[Vector] => Agg),
regularization: Option[DifferentiableRegularization[Array[Double]]],
regularization: Option[DifferentiableRegularization[Vector]],
Copy link
Contributor

@MLnick MLnick Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is just to make things more generic / future-proof, yes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support sparse coefficients I guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But it seems a little strange to make this a Spark Vector while RDDLossFunction is for a Breeze Vector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented on this below.

@@ -322,10 +322,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String

val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept),
bcFeaturesStd, bcFeaturesMean)(_)
val getFeaturesStd = (j: Int) => if (j >=0 && j < numFeatures) featuresStd(j) else 0.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space between >=0

Copy link
Contributor

@hhbyyh hhbyyh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments. Generally LGTM

weightSum += weight
this
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the convenience of other reviewers, I checked this part and the new implementation delegates the merge, gradient, weight and loss to the common implementation in DifferentiableLossAggregator, and made no further modification from the original implementation. LGTM.

@@ -38,34 +40,39 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] {
* @param regParam The magnitude of the regularization.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments for the function need to be updated. (It still says array of coefficients)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Done

}
(0.5 * sum * regParam, Vectors.dense(gradient))
case _: SparseVector =>
throw new IllegalArgumentException("SparseVector is not currently supported.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message: Sparse coefficients...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -50,7 +50,7 @@ private[ml] class RDDLossFunction[
Agg <: DifferentiableLossAggregator[T, Agg]: ClassTag](
instances: RDD[T],
getAggregator: (Broadcast[Vector] => Agg),
regularization: Option[DifferentiableRegularization[Array[Double]]],
regularization: Option[DifferentiableRegularization[Vector]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support sparse coefficients I guess.

@@ -62,8 +62,8 @@ private[ml] class RDDLossFunction[
val newAgg = instances.treeAggregate(thisAgg)(seqOp, combOp, aggregationDepth)
val gradient = newAgg.gradient
val regLoss = regularization.map { regFun =>
val (regLoss, regGradient) = regFun.calculate(coefficients.data)
BLAS.axpy(1.0, Vectors.dense(regGradient), gradient)
val (regLoss, regGradient) = regFun.calculate(Vectors.fromBreeze(coefficients))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, for example, we need to convert from Breeze. We could just make the L2 reg also for Breeze vector? It feels like we should just be able to compose the loss function and reg loss function together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say compose, do you just mean have the same input/output types, or do you mean you want to do something like this?:

val combinedLoss: DiffFunction[Vector] = dataLoss.add(regLoss)

I do agree it's awkward. Ideally, the loss functions would use Spark vectors but that isn't possible since Breeze requires that the type being optimized implements certain type classes. However, we don't directly optimize the regularization so it doesn't have to be a Breeze vector. Could we make it all Breeze? Yes, but then we'd have to convert the gradient from the aggregator to Breeze and add them using Breeze ops... or something. I don't see that one way is really better than the other, until we can implement something more comprehensive - for example, implementing optimization framework directly in Spark would/will solve this: https://issues.apache.org/jira/browse/SPARK-17136

I suppose I'm willing to change this, though I'm not sure how much it helps, and since it's all internal we aren't locking ourselves into anything. Additionally, doing these awkward conversions only happens here, in one place so it's not spread throughout the codebase or anything. Thoughts?

Copy link
Contributor

@MLnick MLnick Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, more or less - see how Breeze does L2 reg in DiffFunction.withL2Regularization - though I guess that is not quite what I'm getting at.

In any case, it's not a major issue. But perhaps a cleaner way can be found in future - or perhaps SPARK-17136 can address it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree there is room for improvement. My preference is to leave it for a follow up, though.

@@ -157,4 +160,38 @@ object DifferentiableLossAggregatorSuite {
this
}
}

/** Get feature and label summarizers for provided data. */
private[ml] def getRegressionSummarizers(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like these convenience functions could be adapted to also be used in both in train logic across the linear models as well as here in the tests? Since the seqOp and comboOp are defined multiple times everywhere.

Though that could be done in a later clean up PR perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about it. Best for a follow up I think.

https://issues.apache.org/jira/browse/SPARK-21245

instances.foreach(agg.add)

// compute the loss
val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i / numClasses)).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not also test the non-standardized path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow. What path do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we're checking that the loss and gradient computed analytically are equivalent to that computed by the aggregator. Here we check the "standardized" version (that does implicit on the fly standardization). But doesn't that mean we haven't compared the standardization=false case?

Copy link
Contributor

@yanboliang yanboliang Jul 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to check standardization=false case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LogisticAggregator is agnostic to standardization. There is no option to turn standardization off or on for the LogisticAggregator. In logistic regression, turning it off or on affects only the regularization - and so it is tested there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds weird actually. It means there is no point in having the standardization param? If it's false then actually the loss gradients are computed in the scaled space regardless, while the reg gradient is not?

That doesn't sound like expected behavior if a user explicitly sets standardization=false.

scala> val lr = new LogisticRegression()
scala> val model1 = lr.setStandardization(false).setRegParam(0.0).fit(df)
scala> val model2 = lr.setStandardization(true).setRegParam(0.0).fit(df)
scala> val model3 = lr.setStandardization(false).setRegParam(0.1).fit(df)
scala> val model4 = lr.setStandardization(true).setRegParam(0.1).fit(df)

scala> model1.coefficients == model2.coefficients
res0: Boolean = true

scala> model3.coefficients == model4.coefficients
res1: Boolean = false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is the intended behavior. You can read the scaladoc to verify as such.

Copy link
Contributor

@MLnick MLnick Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's intended. But to me it's not expected as a user. Surely setting standardization=false, a user expects the model not to be trained in the scaled space?

Put another way - what's the point of having the regularization loss effectively not scaled, but the objective loss always scaled? May as well not have a standardization parameter then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What users interpret standardization to mean may be different than what it means in Spark, sure. I wasn't around when that design choice was made.

Standardization is always done internally because it improves convergence. When no regularization is applied, you can verify that the two scenarios:

  • training on scaled features and then converting the coefficients to the unscaled space
  • training on un-scaled features

produce the same coefficients. So, instead of literally not standardizing the features when standardization is false, we just do it anyway because it's better for convergence, and it doesn't affect the results.

However, when regularization is applied, the results are not the same. The l2 loss, for example, is sum beta_j^2. But the coefficients that we are using during training are scaled coefficients (since we always scale), which are effectively beta_{j, scaled} = beta_j * sigma_j. We need to "unstandardize" the coefficients for the regularization part of the loss because their scales do matter. If we didn't unstandardize them, we'd change the regularization loss to sum (beta_j * sigma_j)^2, which would not be correct if the user wished not to train in the scaled space.

Do I think this is all a bit confusing, especially for users? Yes. I believe this was done to match glmnet. I think it's fine the way it is, but that discussion is for another JIRA anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's not necessary to discuss here.

@@ -46,11 +46,11 @@ class RDDLossFunctionSuite extends SparkFunSuite with MLlibTestSparkContext {
val lossWithReg = new RDDLossFunction(instances, getAgg, Some(regLossFun))

val (loss1, grad1) = lossNoReg.calculate(coefficients.asBreeze.toDenseVector)
val (regLoss, regGrad) = regLossFun.calculate(coefficients.toArray)
val (regLoss, regGrad) = regLossFun.calculate(coefficients)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this serves to highlight the inconsistency - the loss fn and reg loss fn should be composable IMO.

@MLnick
Copy link
Contributor

MLnick commented Jun 27, 2017

Overall looks good, left a few comments about the reg function change and tests for logistic agg.

@SparkQA
Copy link

SparkQA commented Jun 28, 2017

Test build #78820 has finished for PR 18305 at commit a19b385.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* coefficients.
*
* @param regParam The magnitude of the regularization.
* @param shouldApply A function (Int => Boolean) indicating whether a given index should have
* regularization applied to it.
* @param featuresStd Option indicating whether the regularization should be scaled by the standard
* deviation of the features.
* @param featuresStd Option for a function which maps coefficient index (column major) to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should getFeaturesStd or applyFeaturesStd be better? Since this is a function, not the original features std array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val numClasses = ySummarizer.histogram.length
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to destroy these broadcast variable explicitly even in test suites.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I added some code to do this, it's certainly clunky, but should suffice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's done now - but is it really necessary? Do we do that in other tests explicitly? Since typically the spark context / session is torn down after each test suite I don't think it's a real issue.

It adds a whole bunch of code that just serves to obfuscate the actual test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts @yanboliang? Nick may have a valid point here - the size of these broadcast variables should be very small, and they'll get destroyed along with the SparkContext after every test suite. Is there something else we haven't thought of?

Copy link
Contributor

@yanboliang yanboliang Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we always try to destroy broadcast variable explicitly both in source code and test cases, like here. Of course, these broadcast variables can be destroyed after spark session is torn down.
The reason of why we do this in source code is users application may be long-time running, so it will accumulate lots of these variables, waste lots of resource and slower your application.
The reason of why we do this in test case is we should keep same code route as in source code. Since we have encountered similar bugs which was not covered by test cases.
But in this case, I think it's safe to not destroy these variables. I didn't have strong opinion and just suggested to follow MLlib's convention. Thanks.

instances.foreach(agg.add)

// compute the loss
val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i / numClasses)).toArray
Copy link
Contributor

@yanboliang yanboliang Jul 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to check standardization=false case.

@SparkQA
Copy link

SparkQA commented Jul 5, 2017

Test build #79244 has finished for PR 18305 at commit a51e565.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor

MLnick commented Jul 11, 2017 via email

@yanboliang
Copy link
Contributor

@MLnick I remembered a bug I hit several months ago: we forgot to destroy a broadcast variable in source code, but it throws exception after we add destroy explicitly. This is because we put broadcast variable in wrong place, it works well if we don't destroy it explicitly. This could be avoid if we add corresponding test cases with destroy explicitly. If not, we may forgot to destroy broadcast variable explicitly in source code, which is likely to lead to potential bugs. This is why I'm prefer to keep consistent logic in both source and test. I think it's trivial issue for this case. Thanks.

@sethah
Copy link
Contributor Author

sethah commented Jul 12, 2017

Did we reach a consensus on the broadcast variables? My opinion is that it's probably better in this case not to worry about it, and we can back out the change that destroys them in the test suites.

}

val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures * numCoefficientSets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the regularization contains intercept, right?

However, the comment in LogisticRegression.scala: 1903L is:

    // We do not apply regularization to the intercepts

Copy link
Contributor

@MLnick MLnick Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intercepts are appended to the coefficient vector, so the idx for intercept will be >= numFeatures * numCoefficientSets. Hence this function ignores intercept reg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying.

}
case None =>
sum += coef * coef
gradient(j) = coef * regParam
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial, to match regParam * temp above, how about using regParam * coef?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't see the point in this change.

@MLnick
Copy link
Contributor

MLnick commented Jul 18, 2017

@sethah IMO we should back out the test-related bc var explicit destroy code as it complicates things. I hear that this may help catch bugs... but frankly I'm not convinced.

Because the code setup & path in the source may not be quite the same as in the tests (almost never I'd say), I don't believe you will necessarily catch bugs such as the one mentioned by Yanbo.

@sethah
Copy link
Contributor Author

sethah commented Jul 18, 2017

Thanks @MLnick. I agree with you about the broadcasting, so have backed it out. I think all comments are addressed now, please let me know if there is anything else.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79715 has finished for PR 18305 at commit cb40b31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sethah
Copy link
Contributor Author

sethah commented Jul 21, 2017

What's blocking us here?

@MLnick
Copy link
Contributor

MLnick commented Jul 21, 2017

No blockers, I'm pretty happy. I just wanted to do a final pass mostly over the test cases.

@MLnick
Copy link
Contributor

MLnick commented Jul 26, 2017

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79962 has finished for PR 18305 at commit cb40b31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor

MLnick commented Jul 26, 2017

Merged to master. Thanks @sethah, and thanks all for reviews.

@asfgit asfgit closed this in cf29828 Jul 26, 2017
@sethah
Copy link
Contributor Author

sethah commented Jul 26, 2017

Thanks @MLnick, @hhbyyh, and @facaiy for reviewing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants