Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors #27360

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",

"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ private[spark] object BLAS extends Serializable {

val xTemp = xValues(k) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(Arows(i)) += Avals(i) * xTemp
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold {
with HasAggregationDepth with HasThreshold with HasBlockSize {

/**
* Param for threshold in binary classification prediction.
Expand Down Expand Up @@ -155,19 +155,26 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE

val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
Expand Down Expand Up @@ -208,20 +215,33 @@ class LinearSVC @Since("2.2.0") (
throw new SparkException(msg)
}

val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(getFeaturesStd)))
if ($(standardization)) None else Some(featuresStd.apply)))
} else {
None
}

val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
val standardized = instances.map {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the standardization outside of HingeAggregator, so that no longer need to standardize input in each iter.

case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))

def regParamL1Fun = (index: Int) => 0D
Expand All @@ -238,6 +258,7 @@ class LinearSVC @Since("2.2.0") (
scaledObjectiveHistory += state.adjustedValue
}

blocks.unpersist()
bcFeaturesStd.destroy()
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -268,8 +289,6 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}

if (handlePersistence) instances.unpersist()

copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}
Expand Down
130 changes: 129 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.ml.feature

import org.apache.spark.ml.linalg.Vector
import scala.collection.mutable

import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD

/**
* Class that represents an instance of weighted data point with label and features.
Expand All @@ -28,6 +31,131 @@ import org.apache.spark.ml.linalg.Vector
*/
private[spark] case class Instance(label: Double, weight: Double, features: Vector)


/**
* Class that represents an block of instance.
* If all weights are 1, then an empty array is stored.
*/
private[spark] case class InstanceBlock(
labels: Array[Double],
weights: Array[Double],
matrix: Matrix) {
require(labels.length == matrix.numRows)
require(matrix.isTransposed)
if (weights.nonEmpty) {
require(labels.length == weights.length)
}

def size: Int = labels.length

def numFeatures: Int = matrix.numCols

def instanceIterator: Iterator[Instance] = {
if (weights.nonEmpty) {
labels.iterator.zip(weights.iterator).zip(matrix.rowIter)
.map { case ((label, weight), vec) => Instance(label, weight, vec) }
} else {
labels.iterator.zip(matrix.rowIter)
.map { case (label, vec) => Instance(label, 1.0, vec) }
}
}

def getLabel(i: Int): Double = labels(i)

def labelIter: Iterator[Double] = labels.iterator

@transient lazy val getWeight: Int => Double = {
if (weights.nonEmpty) {
(i: Int) => weights(i)
} else {
(i: Int) => 1.0
}
}

def weightIter: Iterator[Double] = {
if (weights.nonEmpty) {
weights.iterator
} else {
Iterator.fill(size)(1.0)
}
}

// directly get the non-zero iterator of i-th row vector without array copy or slice
@transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = {
matrix match {
case dm: DenseMatrix =>
(i: Int) =>
val start = numFeatures * i
Iterator.tabulate(numFeatures)(j =>
(j, dm.values(start + j))
).filter(_._2 != 0)
case sm: SparseMatrix =>
(i: Int) =>
val start = sm.colPtrs(i)
val end = sm.colPtrs(i + 1)
Iterator.tabulate(end - start)(j =>
(sm.rowIndices(start + j), sm.values(start + j))
).filter(_._2 != 0)
}
}
}

private[spark] object InstanceBlock {

def fromInstances(instances: Seq[Instance]): InstanceBlock = {
val labels = instances.map(_.label).toArray
val weights = if (instances.exists(_.weight != 1)) {
instances.map(_.weight).toArray
} else {
Array.emptyDoubleArray
}
val numRows = instances.length
val numCols = instances.head.features.size
val denseSize = Matrices.getDenseSize(numCols, numRows)
val nnz = instances.iterator.map(_.features.numNonzeros).sum
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
val matrix = if (denseSize < sparseSize) {
val values = Array.ofDim[Double](numRows * numCols)
var offset = 0
var j = 0
while (j < numRows) {
instances(j).features.foreachNonZero { (i, v) =>
values(offset + i) = v
}
offset += numCols
j += 1
}
new DenseMatrix(numRows, numCols, values, true)
} else {
val colIndices = mutable.ArrayBuilder.make[Int]
val values = mutable.ArrayBuilder.make[Double]
val rowPtrs = mutable.ArrayBuilder.make[Int]
var rowPtr = 0
rowPtrs += 0
var j = 0
while (j < numRows) {
var nnz = 0
instances(j).features.foreachNonZero { (i, v) =>
colIndices += i
values += v
nnz += 1
}
rowPtr += nnz
rowPtrs += rowPtr
j += 1
}
new SparseMatrix(numRows, numCols, rowPtrs.result(),
colIndices.result(), values.result(), true)
}
InstanceBlock(labels, weights, matrix)
}

def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
}


/**
* Case class that represents an instance of data point with
* label, weight, offset and features.
Expand Down
Loading