From 0dafab5aa5910709eb446072d16e72dc01ff0592 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 7 Feb 2020 15:47:49 +0800 Subject: [PATCH] Revert "[SPARK-30659][ML][PYSPARK] LogisticRegression blockify input vectors" This reverts commit 073ce125436a8ad55470057718a14c92b6b5b939. --- .../spark/ml/classification/LinearSVC.scala | 4 +- .../classification/LogisticRegression.scala | 58 ++-- .../ml/optim/aggregator/HingeAggregator.scala | 31 ++- .../optim/aggregator/LogisticAggregator.scala | 252 ++---------------- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +- .../spark/ml/param/shared/sharedParams.scala | 12 +- .../classification/LogisticRegression.scala | 4 +- .../LogisticRegressionSuite.scala | 4 +- .../aggregator/LogisticAggregatorSuite.scala | 56 +--- python/pyspark/ml/classification.py | 34 +-- .../ml/param/_shared_params_code_gen.py | 2 +- python/pyspark/ml/param/shared.py | 2 +- 12 files changed, 113 insertions(+), 350 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index f16648d2abee6..6b1cdd8ad3963 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -157,7 +157,7 @@ class LinearSVC @Since("2.2.0") ( /** * Set block size for stacking input data in matrices. - * Default is 1024. + * Default is 4096. * * @group expertSetParam */ @@ -240,7 +240,7 @@ class LinearSVC @Since("2.2.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training dataset (blockSize=${$(blockSize)})") - val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) + val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 9b5b36257a584..50c14d086957f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol - with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth - with HasBlockSize { + with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") ( @Since("2.2.0") def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) - /** - * Set block size for stacking input data in matrices. - * Default is 1024. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, numFeatures: Int): Unit = { @@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train( - dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr => + override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + train(dataset, handlePersistence) + } + + protected[spark] def train( + dataset: Dataset[_], + handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr => + val instances = extractInstances(dataset) + + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) - val sc = dataset.sparkSession.sparkContext - val instances = extractInstances(dataset) - val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => @@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") ( s"dangerous ground, so the algorithm may not converge.") } - val featuresMean = summarizer.mean.compressed - val featuresStd = summarizer.std.compressed - val bcFeaturesStd = sc.broadcast(featuresStd) + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.std.toArray if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { @@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") ( val regParamL1 = $(elasticNetParam) * $(regParam) val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) - val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept), + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), multinomial = isMultinomial)(_) val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { featuresStd(j / numCoefficientSets) @@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") ( None } - val standardized = instances.map { - case Instance(label, weight, features) => - val featuresStd = bcFeaturesStd.value - val array = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = featuresStd(i) - if (std != 0) array(i) = v / std - } - Instance(label, weight, Vectors.dense(array)) - } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) - .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training dataset (blockSize=${$(blockSize)})") - - val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, + val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets @@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") ( state = states.next() arrayBuilder += state.adjustedValue } - blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { @@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") ( } } + if (handlePersistence) instances.unpersist() + val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector, numClasses, isMultinomial)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 292187b3e146e..25f7c9ddab42d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -35,7 +35,8 @@ import org.apache.spark.ml.linalg._ */ private[ml] class HingeAggregator( numFeatures: Int, - fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + fitIntercept: Boolean, + blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures @@ -54,6 +55,20 @@ private[ml] class HingeAggregator( } } + @transient private lazy val intercept = + if (fitIntercept) coefficientsArray(numFeatures) else 0.0 + + @transient private lazy val linearGradSumVec = { + if (fitIntercept) { + new DenseVector(Array.ofDim[Double](numFeatures)) + } else { + null + } + } + + @transient private lazy val auxiliaryVec = + new DenseVector(Array.ofDim[Double](blockSize)) + /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -123,14 +138,19 @@ private[ml] class HingeAggregator( val localGradientSumArray = gradientSumArray // vec here represents dotProducts - val vec = if (fitIntercept && coefficientsArray.last != 0) { - val intercept = coefficientsArray.last - new DenseVector(Array.fill(size)(intercept)) + val vec = if (size == blockSize) { + auxiliaryVec } else { + // the last block within one partition may be of size less than blockSize new DenseVector(Array.ofDim[Double](size)) } if (fitIntercept) { + var i = 0 + while (i < size) { + vec.values(i) = intercept + i += 1 + } BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) } else { BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) @@ -165,9 +185,6 @@ private[ml] class HingeAggregator( if (vec.values.forall(_ == 0)) return this if (fitIntercept) { - // localGradientSumArray is of size numFeatures+1, so can not - // be directly used as the output of BLAS.gemv - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } localGradientSumArray(numFeatures) += vec.values.sum diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 76d21995a2c50..f2b3566f8f09e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{DenseVector, Vector} import org.apache.spark.mllib.util.MLUtils /** @@ -171,6 +171,7 @@ import org.apache.spark.mllib.util.MLUtils * * * @param bcCoefficients The broadcast coefficients corresponding to the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. @@ -182,12 +183,13 @@ import org.apache.spark.mllib.util.MLUtils * since this form is optimal for the matrix operations used for prediction. */ private[ml] class LogisticAggregator( - numFeatures: Int, + bcFeaturesStd: Broadcast[Array[Double]], numClasses: Int, fitIntercept: Boolean, multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging { + extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging { + private val numFeatures = bcFeaturesStd.value.length private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures private val coefficientSize = bcCoefficients.value.size protected override val dim: Int = coefficientSize @@ -207,31 +209,6 @@ private[ml] class LogisticAggregator( s"got type ${bcCoefficients.value.getClass}.)") } - @transient private lazy val binaryLinear = { - if (!multinomial) { - if (fitIntercept) { - new DenseVector(coefficientsArray.take(numFeatures)) - } else { - new DenseVector(coefficientsArray) - } - } else { - null - } - } - - @transient private lazy val multinomialLinear = { - if (multinomial) { - if (fitIntercept) { - new DenseMatrix(numClasses, numFeatures, coefficientsArray.take(numClasses * numFeatures)) - } else { - new DenseMatrix(numClasses, numFeatures, coefficientsArray) - } - } else { - null - } - } - - if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + s"coefficients for positive and negative classes. When no regularization is applied, the" + @@ -242,12 +219,15 @@ private[ml] class LogisticAggregator( /** Update gradient and loss using binary loss function. */ private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { + val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray val margin = - { var sum = 0.0 features.foreachNonZero { (index, value) => - sum += localCoefficients(index) * value + if (localFeaturesStd(index) != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -256,7 +236,9 @@ private[ml] class LogisticAggregator( val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) features.foreachNonZero { (index, value) => - localGradientArray(index) += multiplier * value + if (localFeaturesStd(index) != 0.0) { + localGradientArray(index) += multiplier * value / localFeaturesStd(index) + } } if (fitIntercept) { @@ -271,61 +253,6 @@ private[ml] class LogisticAggregator( } } - /** Update gradient and loss using binary loss function. */ - private def binaryUpdateInPlace(block: InstanceBlock): Unit = { - val size = block.size - val localGradientSumArray = gradientSumArray - - // vec here represents margins or negative dotProducts - val vec = if (fitIntercept && coefficientsArray.last != 0) { - val intercept = coefficientsArray.last - new DenseVector(Array.fill(size)(intercept)) - } else { - new DenseVector(Array.ofDim[Double](size)) - } - - if (fitIntercept) { - BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) - } else { - BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec) - } - - // in-place convert margins to multiplier - // then, vec represents multiplier - var i = 0 - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - weightSum += weight - val label = block.getLabel(i) - val margin = vec(i) - if (label > 0) { - // The following is equivalent to log(1 + exp(margin)) but more numerically stable. - lossSum += weight * MLUtils.log1pExp(margin) - } else { - lossSum += weight * (MLUtils.log1pExp(margin) - margin) - } - val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) - vec.values(i) = multiplier - } else { - vec.values(i) = 0.0 - } - i += 1 - } - - if (fitIntercept) { - // localGradientSumArray is of size numFeatures+1, so can not - // be directly used as the output of BLAS.gemv - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) - linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += vec.values.sum - } else { - val gradSumVec = new DenseVector(localGradientSumArray) - BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) - } - } - /** Update gradient and loss using multinomial (softmax) loss function. */ private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { // TODO: use level 2 BLAS operations @@ -333,6 +260,7 @@ private[ml] class LogisticAggregator( Note: this can still be used when numClasses = 2 for binary logistic regression without pivoting. */ + val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray @@ -342,10 +270,13 @@ private[ml] class LogisticAggregator( val margins = new Array[Double](numClasses) features.foreachNonZero { (index, value) => - var j = 0 - while (j < numClasses) { - margins(j) += localCoefficients(index * numClasses + j) * value - j += 1 + if (localFeaturesStd(index) != 0.0) { + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + margins(j) += localCoefficients(index * numClasses + j) * stdValue + j += 1 + } } } var i = 0 @@ -383,10 +314,13 @@ private[ml] class LogisticAggregator( multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) } features.foreachNonZero { (index, value) => - var j = 0 - while (j < numClasses) { - localGradientArray(index * numClasses + j) += weight * multipliers(j) * value - j += 1 + if (localFeaturesStd(index) != 0.0) { + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue + j += 1 + } } } if (fitIntercept) { @@ -405,112 +339,6 @@ private[ml] class LogisticAggregator( lossSum += weight * loss } - /** Update gradient and loss using multinomial (softmax) loss function. */ - private def multinomialUpdateInPlace(block: InstanceBlock): Unit = { - val size = block.size - val localGradientSumArray = gradientSumArray - - // mat here represents margins, shape: S X C - val mat = new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses)) - - if (fitIntercept) { - val intercept = coefficientsArray.takeRight(numClasses) - var i = 0 - while (i < size) { - var j = 0 - while (j < numClasses) { - mat.update(i, j, intercept(j)) - j += 1 - } - i += 1 - } - BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat) - } else { - BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 0.0, mat) - } - - // in-place convert margins to multipliers - // then, mat represents multipliers - var i = 0 - val tmp = Array.ofDim[Double](numClasses) - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - weightSum += weight - val label = block.getLabel(i) - - var maxMargin = Double.NegativeInfinity - var j = 0 - while (j < numClasses) { - tmp(j) = mat(i, j) - maxMargin = math.max(maxMargin, tmp(j)) - j += 1 - } - - // marginOfLabel is margins(label) in the formula - val marginOfLabel = tmp(label.toInt) - - var sum = 0.0 - j = 0 - while (j < numClasses) { - if (maxMargin > 0) tmp(j) -= maxMargin - val exp = math.exp(tmp(j)) - sum += exp - tmp(j) = exp - j += 1 - } - - j = 0 - while (j < numClasses) { - val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0)) - mat.update(i, j, multiplier) - j += 1 - } - - if (maxMargin > 0) { - lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) - } else { - lossSum += weight * (math.log(sum) - marginOfLabel) - } - } else { - var j = 0 - while (j < numClasses) { - mat.update(i, j, 0.0) - j += 1 - } - } - i += 1 - } - - // block.matrix: S X F, unknown type - // mat (multipliers): S X C, dense - // gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense - block.matrix match { - case dm: DenseMatrix if !fitIntercept => - // If fitIntercept==false, gradientSumArray += mat.T X matrix - // GEMM requires block.matrix is dense - val gradSumMat = new DenseMatrix(numClasses, numFeatures, localGradientSumArray) - BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat) - - case _ => - // Otherwise, use linearGradSumMat (F X C) as a temp matrix: - // linearGradSumMat = matrix.T X mat - val linearGradSumMat = new DenseMatrix(numFeatures, numClasses, - Array.ofDim[Double](numFeatures * numClasses)) - BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat) - linearGradSumMat.foreachActive { (i, j, v) => - if (v != 0) localGradientSumArray(i * numClasses + j) += v - } - - if (fitIntercept) { - val start = numClasses * numFeatures - mat.foreachActive { (i, j, v) => - if (v != 0) localGradientSumArray(start + j) += v - } - } - } - } - /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient * of the objective function. @@ -535,28 +363,4 @@ private[ml] class LogisticAggregator( this } } - - /** - * Add a new training instance block to this LogisticAggregator, and update the loss and gradient - * of the objective function. - * - * @param block The instance block of data point to be added. - * @return This LogisticAggregator object. - */ - def add(block: InstanceBlock): this.type = { - require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${block.numFeatures}.") - require(block.weightIter.forall(_ >= 0), - s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (block.weightIter.forall(_ == 0)) return this - - if (multinomial) { - multinomialUpdateInPlace(block) - } else { - binaryUpdateInPlace(block) - } - - this - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 3d1fab8692af7..eee75e7f5b722 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,10 +104,10 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation"), + "validation."), ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + "stacked within partitions. If block size is more than remaining data in a partition " + - "then it is adjusted to the size of this data", Some("1024"), + "then it is adjusted to the size of this data.", Some("4096"), isValid = "ParamValidators.gt(0)", isExpertParam = true) ) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 7fe8ccd973a72..3d1c55a5eb429 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -570,29 +570,29 @@ trait HasDistanceMeasure extends Params { trait HasValidationIndicatorCol extends Params { /** - * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation. + * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.. * @group param */ - final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation") + final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.") /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } /** - * Trait for shared param blockSize (default: 1024). This trait may be changed or + * Trait for shared param blockSize (default: 4096). This trait may be changed or * removed between minor versions. */ @DeveloperApi trait HasBlockSize extends Params { /** - * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. + * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.. * @group expertParam */ - final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0)) + final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0)) - setDefault(blockSize, 1024) + setDefault(blockSize, 4096) /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index f88f3fce61b33..21eb17dfaacb3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -339,8 +339,10 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) + // Determine if we should cache the DF + val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df) + val mlLogisticRegressionModel = lr.train(df, handlePersistence) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 9e359ba098bfb..6d31e6efc7e1c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -542,7 +542,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in LogisticAggregator") { val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val binaryAgg = new LogisticAggregator(1, 2, + val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2, fitIntercept = true, multinomial = false)(bcCoefficientsBinary) val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { @@ -552,7 +552,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { assert(thrownBinary.getMessage.contains("coefficients only supports dense")) val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0))) - val multinomialAgg = new LogisticAggregator(1, 3, + val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3, fitIntercept = true, multinomial = true)(bcCoefficientsMulti) val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index 83718076dde7b..e699adcc14c03 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = standardize(Array( + instances = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - )) - instancesConstantFeature = standardize(Array( + ) + instancesConstantFeature = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - )) - instancesConstantFeatureFiltered = standardize(Array( + ) + instancesConstantFeatureFiltered = Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - )) + ) } /** Get summary statistics for some data and create a new LogisticAggregator. */ @@ -55,27 +55,13 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { coefficients: Vector, fitIntercept: Boolean, isMultinomial: Boolean): LogisticAggregator = { - val (_, ySummarizer) = + val (featuresSummarizer, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) val numClasses = ySummarizer.histogram.length - val numFeatures = instances.head.features.size + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new LogisticAggregator(numFeatures, numClasses, fitIntercept, isMultinomial)(bcCoefficients) - } - - private def standardize(instances: Array[Instance]): Array[Instance] = { - val (featuresSummarizer, _) = - DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) - val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) - val numFeatures = stdArray.length - instances.map { case Instance(label, weight, features) => - val standardized = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = stdArray(i) - if (std != 0) standardized(i) = v / std - } - Instance(label, weight, Vectors.dense(standardized).compressed) - } + new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) } test("aggregator add method input size") { @@ -291,24 +277,4 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { validateGradient(aggConstantFeatureBinary.gradient, aggConstantFeatureBinaryFiltered.gradient, 1) } - - test("add instance block") { - val binaryInstances = instances.map { instance => - if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features) - } - val coefArray = Array(1.0, 2.0) - val intercept = 1.0 - - val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, isMultinomial = false) - binaryInstances.foreach(agg.add) - - val agg2 = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, isMultinomial = false) - val block = InstanceBlock.fromInstances(binaryInstances) - agg2.add(block) - - assert(agg.loss ~== agg2.loss relTol 1e-8) - assert(agg.gradient ~== agg2.gradient relTol 1e-8) - } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index bb9cd034808fc..89d27fbfa316e 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -216,7 +216,7 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable >>> model.getThreshold() 0.5 >>> model.getBlockSize() - 1024 + 4096 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -255,19 +255,19 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1024): + aggregationDepth=2, blockSize=4096): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1024): + aggregationDepth=2, blockSize=4096): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, threshold=0.0, aggregationDepth=2, - blockSize=1024) + blockSize=4096) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -276,12 +276,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1024): + aggregationDepth=2, blockSize=4096): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1024): + aggregationDepth=2, blockSize=4096): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -388,7 +388,7 @@ def intercept(self): class _LogisticRegressionParams(_JavaProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, - HasThreshold, HasBlockSize): + HasThreshold): """ Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. @@ -570,8 +570,6 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams, 10 >>> blor.clear(blor.maxIter) >>> blorModel = blor.fit(bdf) - >>> blorModel.getBlockSize() - 1024 >>> blorModel.setFeaturesCol("features") LogisticRegressionModel... >>> blorModel.setProbabilityCol("newProbability") @@ -640,7 +638,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -649,14 +647,13 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LogisticRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", - blockSize=1024) + self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") kwargs = self._input_kwargs self.setParams(**kwargs) self._checkThresholdConsistency() @@ -669,7 +666,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -677,7 +674,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ @@ -772,13 +769,6 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - class LogisticRegressionModel(JavaProbabilisticClassificationModel, _LogisticRegressionParams, JavaMLWritable, JavaMLReadable, HasTrainingSummary): diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index fb4d55d57a2df..3994625c05f1b 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -167,7 +167,7 @@ def get$Name(self): None, "TypeConverters.toString"), ("blockSize", "block size for stacking input data in matrices. Data is stacked within " "partitions. If block size is more than remaining data in a partition then it is " - "adjusted to the size of this data.", "1024", "TypeConverters.toInt")] + "adjusted to the size of this data.", "4096", "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 456463580878c..41ba7b9dc5523 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -591,7 +591,7 @@ class HasBlockSize(Params): def __init__(self): super(HasBlockSize, self).__init__() - self._setDefault(blockSize=1024) + self._setDefault(blockSize=4096) def getBlockSize(self): """