From 96d27274f54062a231ff12fd397a3cc051bc063d Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 28 Jan 2020 20:55:21 +0800 Subject: [PATCH] [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors ### What changes were proposed in this pull request? 1, stack input vectors to blocks (like ALS/MLP); 2, add new param `blockSize`; 3, add a new class `InstanceBlock` 4, standardize the input outside of optimization procedure; ### Why are the changes needed? 1, reduce RAM to persist traing dataset; (save ~40% in test) 2, use Level-2 BLAS routines; (12% ~ 28% faster, without native BLAS) ### Does this PR introduce any user-facing change? a new param `blockSize` ### How was this patch tested? existing and updated testsuites Closes #27360 from zhengruifeng/blockify_svc. Authored-by: zhengruifeng Signed-off-by: zhengruifeng --- .../spark/serializer/KryoSerializer.scala | 1 + .../org/apache/spark/ml/linalg/BLAS.scala | 1 - .../spark/ml/classification/LinearSVC.scala | 51 ++++--- .../apache/spark/ml/feature/Instance.scala | 130 +++++++++++++++++- .../ml/optim/aggregator/HingeAggregator.scala | 121 ++++++++++++++-- .../ml/param/shared/SharedParamsCodeGen.scala | 6 +- .../spark/ml/param/shared/sharedParams.scala | 19 +++ .../ml/classification/LinearSVCSuite.scala | 2 +- .../spark/ml/feature/InstanceSuite.scala | 31 +++++ .../aggregator/HingeAggregatorSuite.scala | 52 +++++-- python/pyspark/ml/classification.py | 23 +++- .../ml/param/_shared_params_code_gen.py | 5 +- python/pyspark/ml/param/shared.py | 18 +++ 13 files changed, 407 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index cdaab599e2a0b..55ac2c410953b 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer { "org.apache.spark.ml.attribute.NumericAttribute", "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.InstanceBlock", "org.apache.spark.ml.feature.LabeledPoint", "org.apache.spark.ml.feature.OffsetInstance", "org.apache.spark.ml.linalg.DenseMatrix", diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index e054a15fc9b75..00e5b61dbdc18 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -682,7 +682,6 @@ private[spark] object BLAS extends Serializable { val xTemp = xValues(k) * alpha while (i < indEnd) { - val rowIndex = Arows(i) yValues(Arows(i)) += Avals(i) * xTemp i += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 905789090d625..6b1cdd8ad3963 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.HingeAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold { + with HasAggregationDepth with HasThreshold with HasBlockSize { /** * Param for threshold in binary classification prediction. @@ -155,19 +155,26 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr => - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - - val instances = extractInstances(dataset) - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) + + val sc = dataset.sparkSession.sparkContext + val instances = extractInstances(dataset) val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( @@ -208,20 +215,33 @@ class LinearSVC @Since("2.2.0") ( throw new SparkException(msg) } - val featuresStd = summarizer.std.toArray - val getFeaturesStd = (j: Int) => featuresStd(j) + val featuresStd = summarizer.std.compressed + val bcFeaturesStd = sc.broadcast(featuresStd) val regParamL2 = $(regParam) - val bcFeaturesStd = instances.context.broadcast(featuresStd) val regularization = if (regParamL2 != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(getFeaturesStd))) + if ($(standardization)) None else Some(featuresStd.apply))) } else { None } - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, + val standardized = instances.map { + case Instance(label, weight, features) => + val featuresStd = bcFeaturesStd.value + val array = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = featuresStd(i) + if (std != 0) array(i) = v / std + } + Instance(label, weight, Vectors.dense(array)) + } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_) + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) def regParamL1Fun = (index: Int) => 0D @@ -238,6 +258,7 @@ class LinearSVC @Since("2.2.0") ( scaledObjectiveHistory += state.adjustedValue } + blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -268,8 +289,6 @@ class LinearSVC @Since("2.2.0") ( (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) } - if (handlePersistence) instances.unpersist() - copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 11d0c4689cbba..5476a86eb9d76 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,7 +17,10 @@ package org.apache.spark.ml.feature -import org.apache.spark.ml.linalg.Vector +import scala.collection.mutable + +import org.apache.spark.ml.linalg._ +import org.apache.spark.rdd.RDD /** * Class that represents an instance of weighted data point with label and features. @@ -28,6 +31,131 @@ import org.apache.spark.ml.linalg.Vector */ private[spark] case class Instance(label: Double, weight: Double, features: Vector) + +/** + * Class that represents an block of instance. + * If all weights are 1, then an empty array is stored. + */ +private[spark] case class InstanceBlock( + labels: Array[Double], + weights: Array[Double], + matrix: Matrix) { + require(labels.length == matrix.numRows) + require(matrix.isTransposed) + if (weights.nonEmpty) { + require(labels.length == weights.length) + } + + def size: Int = labels.length + + def numFeatures: Int = matrix.numCols + + def instanceIterator: Iterator[Instance] = { + if (weights.nonEmpty) { + labels.iterator.zip(weights.iterator).zip(matrix.rowIter) + .map { case ((label, weight), vec) => Instance(label, weight, vec) } + } else { + labels.iterator.zip(matrix.rowIter) + .map { case (label, vec) => Instance(label, 1.0, vec) } + } + } + + def getLabel(i: Int): Double = labels(i) + + def labelIter: Iterator[Double] = labels.iterator + + @transient lazy val getWeight: Int => Double = { + if (weights.nonEmpty) { + (i: Int) => weights(i) + } else { + (i: Int) => 1.0 + } + } + + def weightIter: Iterator[Double] = { + if (weights.nonEmpty) { + weights.iterator + } else { + Iterator.fill(size)(1.0) + } + } + + // directly get the non-zero iterator of i-th row vector without array copy or slice + @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = { + matrix match { + case dm: DenseMatrix => + (i: Int) => + val start = numFeatures * i + Iterator.tabulate(numFeatures)(j => + (j, dm.values(start + j)) + ).filter(_._2 != 0) + case sm: SparseMatrix => + (i: Int) => + val start = sm.colPtrs(i) + val end = sm.colPtrs(i + 1) + Iterator.tabulate(end - start)(j => + (sm.rowIndices(start + j), sm.values(start + j)) + ).filter(_._2 != 0) + } + } +} + +private[spark] object InstanceBlock { + + def fromInstances(instances: Seq[Instance]): InstanceBlock = { + val labels = instances.map(_.label).toArray + val weights = if (instances.exists(_.weight != 1)) { + instances.map(_.weight).toArray + } else { + Array.emptyDoubleArray + } + val numRows = instances.length + val numCols = instances.head.features.size + val denseSize = Matrices.getDenseSize(numCols, numRows) + val nnz = instances.iterator.map(_.features.numNonzeros).sum + val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) + val matrix = if (denseSize < sparseSize) { + val values = Array.ofDim[Double](numRows * numCols) + var offset = 0 + var j = 0 + while (j < numRows) { + instances(j).features.foreachNonZero { (i, v) => + values(offset + i) = v + } + offset += numCols + j += 1 + } + new DenseMatrix(numRows, numCols, values, true) + } else { + val colIndices = mutable.ArrayBuilder.make[Int] + val values = mutable.ArrayBuilder.make[Double] + val rowPtrs = mutable.ArrayBuilder.make[Int] + var rowPtr = 0 + rowPtrs += 0 + var j = 0 + while (j < numRows) { + var nnz = 0 + instances(j).features.foreachNonZero { (i, v) => + colIndices += i + values += v + nnz += 1 + } + rowPtr += nnz + rowPtrs += rowPtr + j += 1 + } + new SparseMatrix(numRows, numCols, rowPtrs.result(), + colIndices.result(), values.result(), true) + } + InstanceBlock(labels, weights, matrix) + } + + def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { + instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) + } +} + + /** * Case class that represents an instance of data point with * label, weight, offset and features. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b0906f1b06511..25f7c9ddab42d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ /** @@ -32,21 +32,43 @@ import org.apache.spark.ml.linalg._ * * @param bcCoefficients The coefficients corresponding to the features. * @param fitIntercept Whether to fit an intercept term. - * @param bcFeaturesStd The standard deviation values of the features. */ private[ml] class HingeAggregator( - bcFeaturesStd: Broadcast[Array[Double]], - fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, HingeAggregator] { + numFeatures: Int, + fitIntercept: Boolean, + blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { - private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + protected override val dim: Int = numFeaturesPlusIntercept @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + s" but got type ${bcCoefficients.value.getClass}.") } - protected override val dim: Int = numFeaturesPlusIntercept + + @transient private lazy val linear = { + if (fitIntercept) { + new DenseVector(coefficientsArray.take(numFeatures)) + } else { + new DenseVector(coefficientsArray) + } + } + + @transient private lazy val intercept = + if (fitIntercept) coefficientsArray(numFeatures) else 0.0 + + @transient private lazy val linearGradSumVec = { + if (fitIntercept) { + new DenseVector(Array.ofDim[Double](numFeatures)) + } else { + null + } + } + + @transient private lazy val auxiliaryVec = + new DenseVector(Array.ofDim[Double](blockSize)) + /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -62,16 +84,13 @@ private[ml] class HingeAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientSumArray = gradientSumArray val dotProduct = { var sum = 0.0 features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } + sum += localCoefficients(index) * value } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -88,9 +107,7 @@ private[ml] class HingeAggregator( if (1.0 > labelScaled * dotProduct) { val gradientScale = -labelScaled * weight features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) - } + localGradientSumArray(index) += value * gradientScale } if (fitIntercept) { localGradientSumArray(localGradientSumArray.length - 1) += gradientScale @@ -102,4 +119,80 @@ private[ml] class HingeAggregator( this } } + + /** + * Add a new training instance block to this HingeAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The InstanceBlock to be added. + * @return This HingeAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size + val localGradientSumArray = gradientSumArray + + // vec here represents dotProducts + val vec = if (size == blockSize) { + auxiliaryVec + } else { + // the last block within one partition may be of size less than blockSize + new DenseVector(Array.ofDim[Double](size)) + } + + if (fitIntercept) { + var i = 0 + while (i < size) { + vec.values(i) = intercept + i += 1 + } + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + } else { + BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) + } + + // in-place convert dotProducts to gradient scales + // then, vec represents gradient scales + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val label = block.getLabel(i) + val labelScaled = 2 * label - 1.0 + val loss = (1.0 - labelScaled * vec(i)) * weight + if (loss > 0) { + lossSum += loss + val gradScale = -labelScaled * weight + vec.values(i) = gradScale + } else { + vec.values(i) = 0.0 + } + } else { + vec.values(i) = 0.0 + } + i += 1 + } + + // predictions are all correct, no gradient signal + if (vec.values.forall(_ == 0)) return this + + if (fitIntercept) { + BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) + linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + localGradientSumArray(numFeatures) += vec.values.sum + } else { + val gradSumVec = new DenseVector(localGradientSumArray) + BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) + } + + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 7ac680ec1183a..eee75e7f5b722 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,7 +104,11 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation.") + "validation."), + ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + + "stacked within partitions. If block size is more than remaining data in a partition " + + "then it is adjusted to the size of this data.", Some("4096"), + isValid = "ParamValidators.gt(0)", isExpertParam = true) ) val code = genSharedParams(params) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 44c993eeafddc..3d1c55a5eb429 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -578,4 +578,23 @@ trait HasValidationIndicatorCol extends Params { /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } + +/** + * Trait for shared param blockSize (default: 4096). This trait may be changed or + * removed between minor versions. + */ +@DeveloperApi +trait HasBlockSize extends Params { + + /** + * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.. + * @group expertParam + */ + final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0)) + + setDefault(blockSize, 4096) + + /** @group expertGetParam */ + final def getBlockSize: Int = $(blockSize) +} // scalastyle:on diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index c2072cea11859..2b63dc259a14f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -179,7 +179,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in HingeAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) + val agg = new HingeAggregator(1, true)(bcCoefficients) val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index 5a74490058398..d780bdf5f5dc8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -42,5 +42,36 @@ class InstanceSuite extends SparkFunSuite{ val o2 = ser.deserialize[OffsetInstance](ser.serialize(o)) assert(o === o2) } + + val block1 = InstanceBlock.fromInstances(Seq(instance1)) + val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2)) + Seq(block1, block2).foreach { o => + val o2 = ser.deserialize[InstanceBlock](ser.serialize(o)) + assert(o.labels === o2.labels) + assert(o.weights === o2.weights) + assert(o.matrix === o2.matrix) + } + } + + test("InstanceBlock: check correctness") { + val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) + val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) + val instances = Seq(instance1, instance2) + + val block = InstanceBlock.fromInstances(instances) + assert(block.size === 2) + assert(block.numFeatures === 2) + block.instanceIterator.zipWithIndex.foreach { + case (instance, i) => + assert(instance.label === instances(i).label) + assert(instance.weight === instances(i).weight) + assert(instance.features.toArray === instances(i).features.toArray) + } + Seq(0, 1).foreach { i => + val nzIter = block.getNonZeroIter(i) + val vec = Vectors.sparse(2, nzIter.toSeq) + assert(vec.toArray === instances(i).features.toArray) + } } + } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 61b48ffa10944..c02a0a5e5e7d0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = Array( + instances = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( + )) + instancesConstantFeature = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantFeatureFiltered = Array( + )) + instancesConstantFeatureFiltered = standardize(Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - ) + )) } /** Get summary statistics for some data and create a new HingeAggregator. */ @@ -54,12 +54,23 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { instances: Array[Instance], coefficients: Vector, fitIntercept: Boolean): HingeAggregator = { - val (featuresSummarizer, ySummarizer) = - DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) - val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) - val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) + new HingeAggregator(instances.head.features.size, fitIntercept)(bcCoefficients) + } + + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } } test("aggregator add method input size") { @@ -160,4 +171,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } + test("add instance block") { + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + instances.foreach(agg.add) + + val agg2 = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + val block = InstanceBlock.fromInstances(instances) + agg2.add(block) + + assert(agg.loss ~== agg2.loss relTol 1e-8) + assert(agg.gradient ~== agg2.gradient relTol 1e-8) + } + } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 5ab8e606bda03..89d27fbfa316e 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -165,7 +165,8 @@ def predictProbability(self, value): class _LinearSVCParams(_JavaClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, - HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold): + HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, + HasBlockSize): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -214,6 +215,8 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable LinearSVCModel... >>> model.getThreshold() 0.5 + >>> model.getBlockSize() + 4096 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -252,18 +255,19 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, - standardization=True, threshold=0.0, aggregationDepth=2) + standardization=True, threshold=0.0, aggregationDepth=2, + blockSize=4096) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -272,12 +276,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -342,6 +346,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): """ diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index ded3ca84b30f2..3994625c05f1b 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -164,7 +164,10 @@ def get$Name(self): "'euclidean'", "TypeConverters.toString"), ("validationIndicatorCol", "name of the column that indicates whether each row is for " + "training or for validation. False indicates training; true indicates validation.", - None, "TypeConverters.toString")] + None, "TypeConverters.toString"), + ("blockSize", "block size for stacking input data in matrices. Data is stacked within " + "partitions. If block size is more than remaining data in a partition then it is " + "adjusted to the size of this data.", "4096", "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 8fc115691f1ab..41ba7b9dc5523 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -580,3 +580,21 @@ def getValidationIndicatorCol(self): Gets the value of validationIndicatorCol or its default value. """ return self.getOrDefault(self.validationIndicatorCol) + + +class HasBlockSize(Params): + """ + Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. + """ + + blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasBlockSize, self).__init__() + self._setDefault(blockSize=4096) + + def getBlockSize(self): + """ + Gets the value of blockSize or its default value. + """ + return self.getOrDefault(self.blockSize)