Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21108] [ML] convert LinearSVC to aggregator framework #18315

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 14 additions & 190 deletions mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.linalg.BLAS._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -214,10 +214,20 @@ class LinearSVC @Since("2.2.0") (
}

val featuresStd = summarizer.variance.toArray.map(math.sqrt)
val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LinearSVCCostFun(instances, $(fitIntercept),
$(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth))
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(getFeaturesStd)))
} else {
None
}

val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

def regParamL1Fun = (index: Int) => 0D
val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
Expand Down Expand Up @@ -372,189 +382,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] {
}
}
}

/**
* LinearSVCCostFun implements Breeze's DiffFunction[T] for hinge loss function
*/
private class LinearSVCCostFun(
instances: RDD[Instance],
fitIntercept: Boolean,
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
regParamL2: Double,
aggregationDepth: Int) extends DiffFunction[BDV[Double]] {

override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
val bcCoeffs = instances.context.broadcast(coeffs)
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length

val svmAggregator = {
val seqOp = (c: LinearSVCAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => c1.merge(c2)

instances.treeAggregate(
new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept)
)(seqOp, combOp, aggregationDepth)
}

val totalGradientArray = svmAggregator.gradient.toArray
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
var sum = 0.0
coeffs.foreachActive { case (index, value) =>
// We do not apply regularization to the intercepts
if (index != numFeatures) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
totalGradientArray(index) += regParamL2 * value
value * value
} else {
if (featuresStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(index) * featuresStd(index))
totalGradientArray(index) += regParamL2 * temp
value * temp
} else {
0.0
}
}
}
}
}
0.5 * regParamL2 * sum
}
bcCoeffs.destroy(blocking = false)

(svmAggregator.loss + regVal, new BDV(totalGradientArray))
}
}

/**
* LinearSVCAggregator computes the gradient and loss for hinge loss function, as used
* in binary classification for instances in sparse or dense vector in an online fashion.
*
* Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* This class standardizes feature values during computation using bcFeaturesStd.
*
* @param bcCoefficients The coefficients corresponding to the features.
* @param fitIntercept Whether to fit an intercept term.
* @param bcFeaturesStd The standard deviation values of the features.
*/
private class LinearSVCAggregator(
bcCoefficients: Broadcast[Vector],
bcFeaturesStd: Broadcast[Array[Double]],
fitIntercept: Boolean) extends Serializable {

private val numFeatures: Int = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
private var weightSum: Double = 0.0
private var lossSum: Double = 0.0
@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
s" but got type ${bcCoefficients.value.getClass}.")
}
private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept)

/**
* Add a new training instance to this LinearSVCAggregator, and update the loss and gradient
* of the objective function.
*
* @param instance The instance of data point to be added.
* @return This LinearSVCAggregator object.
*/
def add(instance: Instance): this.type = {
instance match { case Instance(label, weight, features) =>

if (weight == 0.0) return this
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficientsArray
val localGradientSumArray = gradientSumArray

val dotProduct = {
var sum = 0.0
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
sum += localCoefficients(index) * value / localFeaturesStd(index)
}
}
if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
sum
}
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = 2 * label - 1.0
val loss = if (1.0 > labelScaled * dotProduct) {
weight * (1.0 - labelScaled * dotProduct)
} else {
0.0
}

if (1.0 > labelScaled * dotProduct) {
val gradientScale = -labelScaled * weight
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
}
}
if (fitIntercept) {
localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
}
}

lossSum += loss
weightSum += weight
this
}
}

/**
* Merge another LinearSVCAggregator, and update the loss and gradient
* of the objective function.
* (Note that it's in place merging; as a result, `this` object will be modified.)
*
* @param other The other LinearSVCAggregator to be merged.
* @return This LinearSVCAggregator object.
*/
def merge(other: LinearSVCAggregator): this.type = {

if (other.weightSum != 0.0) {
weightSum += other.weightSum
lossSum += other.lossSum

var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
val len = localThisGradientSumArray.length
while (i < len) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
}
this
}

def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0

def gradient: Vector = {
if (weightSum != 0) {
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / weightSum, result)
result
} else {
Vectors.dense(new Array[Double](numFeaturesPlusIntercept))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._

/**
* HingeAggregator computes the gradient and loss for Hinge loss function as used in
* binary classification for instances in sparse or dense vector in an online fashion.
*
* Two HingeAggregators can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* This class standardizes feature values during computation using bcFeaturesStd.
*
* @param bcCoefficients The coefficients corresponding to the features.
* @param fitIntercept Whether to fit an intercept term.
* @param bcFeaturesStd The standard deviation values of the features.
*/
private[ml] class HingeAggregator(
bcFeaturesStd: Broadcast[Array[Double]],
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[Instance, HingeAggregator] {

private val numFeatures: Int = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
s" but got type ${bcCoefficients.value.getClass}.")
}
protected override val dim: Int = numFeaturesPlusIntercept

/**
* Add a new training instance to this HingeAggregator, and update the loss and gradient
* of the objective function.
*
* @param instance The instance of data point to be added.
* @return This HingeAggregator object.
*/
def add(instance: Instance): this.type = {
instance match { case Instance(label, weight, features) =>
require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
s" Expecting $numFeatures but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

if (weight == 0.0) return this
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficientsArray
val localGradientSumArray = gradientSumArray

val dotProduct = {
var sum = 0.0
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
sum += localCoefficients(index) * value / localFeaturesStd(index)
}
}
if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
sum
}
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = 2 * label - 1.0
val loss = if (1.0 > labelScaled * dotProduct) {
(1.0 - labelScaled * dotProduct) * weight
} else {
0.0
}

if (1.0 > labelScaled * dotProduct) {
val gradientScale = -labelScaled * weight
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
}
}
if (fitIntercept) {
localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
}
}

lossSum += loss
weightSum += weight
this
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.classification.LinearSVCSuite._
import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
Expand Down Expand Up @@ -170,10 +171,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(model2.intercept !== 0.0)
}

test("sparse coefficients in SVCAggregator") {
test("sparse coefficients in HingeAggregator") {
val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true)
val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients)
val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
Expand Down
Loading