Skip to content

Commit

Permalink
[SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1, stack input vectors to blocks (like ALS/MLP);
2, add new param `blockSize`;
3, add a new class `InstanceBlock`
4, standardize the input outside of optimization procedure;

### Why are the changes needed?
1, reduce RAM to persist traing dataset; (save ~40% in test)
2, use Level-2 BLAS routines; (12% ~ 28% faster, without native BLAS)

### Does this PR introduce any user-facing change?
a new param `blockSize`

### How was this patch tested?
existing and updated testsuites

Closes apache#27360 from zhengruifeng/blockify_svc.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
  • Loading branch information
zhengruifeng committed Jan 28, 2020
1 parent 8aebc80 commit 96d2727
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",

"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ private[spark] object BLAS extends Serializable {

val xTemp = xValues(k) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(Arows(i)) += Avals(i) * xTemp
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold {
with HasAggregationDepth with HasThreshold with HasBlockSize {

/**
* Param for threshold in binary classification prediction.
Expand Down Expand Up @@ -155,19 +155,26 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE

val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
Expand Down Expand Up @@ -208,20 +215,33 @@ class LinearSVC @Since("2.2.0") (
throw new SparkException(msg)
}

val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(getFeaturesStd)))
if ($(standardization)) None else Some(featuresStd.apply)))
} else {
None
}

val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))

def regParamL1Fun = (index: Int) => 0D
Expand All @@ -238,6 +258,7 @@ class LinearSVC @Since("2.2.0") (
scaledObjectiveHistory += state.adjustedValue
}

blocks.unpersist()
bcFeaturesStd.destroy()
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -268,8 +289,6 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}

if (handlePersistence) instances.unpersist()

copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}
Expand Down
130 changes: 129 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.ml.feature

import org.apache.spark.ml.linalg.Vector
import scala.collection.mutable

import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD

/**
* Class that represents an instance of weighted data point with label and features.
Expand All @@ -28,6 +31,131 @@ import org.apache.spark.ml.linalg.Vector
*/
private[spark] case class Instance(label: Double, weight: Double, features: Vector)


/**
* Class that represents an block of instance.
* If all weights are 1, then an empty array is stored.
*/
private[spark] case class InstanceBlock(
labels: Array[Double],
weights: Array[Double],
matrix: Matrix) {
require(labels.length == matrix.numRows)
require(matrix.isTransposed)
if (weights.nonEmpty) {
require(labels.length == weights.length)
}

def size: Int = labels.length

def numFeatures: Int = matrix.numCols

def instanceIterator: Iterator[Instance] = {
if (weights.nonEmpty) {
labels.iterator.zip(weights.iterator).zip(matrix.rowIter)
.map { case ((label, weight), vec) => Instance(label, weight, vec) }
} else {
labels.iterator.zip(matrix.rowIter)
.map { case (label, vec) => Instance(label, 1.0, vec) }
}
}

def getLabel(i: Int): Double = labels(i)

def labelIter: Iterator[Double] = labels.iterator

@transient lazy val getWeight: Int => Double = {
if (weights.nonEmpty) {
(i: Int) => weights(i)
} else {
(i: Int) => 1.0
}
}

def weightIter: Iterator[Double] = {
if (weights.nonEmpty) {
weights.iterator
} else {
Iterator.fill(size)(1.0)
}
}

// directly get the non-zero iterator of i-th row vector without array copy or slice
@transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = {
matrix match {
case dm: DenseMatrix =>
(i: Int) =>
val start = numFeatures * i
Iterator.tabulate(numFeatures)(j =>
(j, dm.values(start + j))
).filter(_._2 != 0)
case sm: SparseMatrix =>
(i: Int) =>
val start = sm.colPtrs(i)
val end = sm.colPtrs(i + 1)
Iterator.tabulate(end - start)(j =>
(sm.rowIndices(start + j), sm.values(start + j))
).filter(_._2 != 0)
}
}
}

private[spark] object InstanceBlock {

def fromInstances(instances: Seq[Instance]): InstanceBlock = {
val labels = instances.map(_.label).toArray
val weights = if (instances.exists(_.weight != 1)) {
instances.map(_.weight).toArray
} else {
Array.emptyDoubleArray
}
val numRows = instances.length
val numCols = instances.head.features.size
val denseSize = Matrices.getDenseSize(numCols, numRows)
val nnz = instances.iterator.map(_.features.numNonzeros).sum
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
val matrix = if (denseSize < sparseSize) {
val values = Array.ofDim[Double](numRows * numCols)
var offset = 0
var j = 0
while (j < numRows) {
instances(j).features.foreachNonZero { (i, v) =>
values(offset + i) = v
}
offset += numCols
j += 1
}
new DenseMatrix(numRows, numCols, values, true)
} else {
val colIndices = mutable.ArrayBuilder.make[Int]
val values = mutable.ArrayBuilder.make[Double]
val rowPtrs = mutable.ArrayBuilder.make[Int]
var rowPtr = 0
rowPtrs += 0
var j = 0
while (j < numRows) {
var nnz = 0
instances(j).features.foreachNonZero { (i, v) =>
colIndices += i
values += v
nnz += 1
}
rowPtr += nnz
rowPtrs += rowPtr
j += 1
}
new SparseMatrix(numRows, numCols, rowPtrs.result(),
colIndices.result(), values.result(), true)
}
InstanceBlock(labels, weights, matrix)
}

def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}
}


/**
* Case class that represents an instance of data point with
* label, weight, offset and features.
Expand Down
Loading

0 comments on commit 96d2727

Please sign in to comment.