Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Revert
#27360
#27396
#27374
#27389

### Why are the changes needed?
BLAS need more performace tests, specially on sparse datasets.
Perfermance test of LogisticRegression (#27374) on sparse dataset shows that blockify vectors to matrices and use BLAS will cause performance regression.
LinearSVC and LinearRegression were also updated in the same way as LogisticRegression, so we need to revert them to make sure no regression.

### Does this PR introduce any user-facing change?
remove newly added param blockSize

### How was this patch tested?
reverted testsuites

Closes #27487 from zhengruifeng/revert_blockify_ii.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
  • Loading branch information
zhengruifeng committed Feb 25, 2020
1 parent 52363d4 commit e19f478
Show file tree
Hide file tree
Showing 27 changed files with 260 additions and 1,071 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",

"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ private[spark] object BLAS extends Serializable {

val xTemp = xValues(k) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(Arows(i)) += Avals(i) * xTemp
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold with HasBlockSize {
with HasAggregationDepth with HasThreshold {

/**
* Param for threshold in binary classification prediction.
Expand Down Expand Up @@ -155,26 +155,19 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE

val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
Expand Down Expand Up @@ -215,33 +208,20 @@ class LinearSVC @Since("2.2.0") (
throw new SparkException(msg)
}

val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(featuresStd.apply)))
if ($(standardization)) None else Some(getFeaturesStd)))
} else {
None
}

val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

def regParamL1Fun = (index: Int) => 0D
Expand All @@ -258,7 +238,6 @@ class LinearSVC @Since("2.2.0") (
scaledObjectiveHistory += state.adjustedValue
}

blocks.unpersist()
bcFeaturesStd.destroy()
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -289,6 +268,8 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}

if (handlePersistence) instances.unpersist()

copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.LogisticAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
with HasBlockSize {
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {

import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames

Expand Down Expand Up @@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") (
@Since("2.2.0")
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)

/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

private def assertBoundConstrainedOptimizationParamsValid(
numCoefficientSets: Int,
numFeatures: Int): Unit = {
Expand Down Expand Up @@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") (
this
}

override protected[spark] def train(
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}

protected[spark] def train(
dataset: Dataset[_],
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
val instances = extractInstances(dataset)

if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
fitIntercept)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
Expand Down Expand Up @@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") (
s"dangerous ground, so the algorithm may not converge.")
}

val featuresMean = summarizer.mean.compressed
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.std.toArray

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
Expand All @@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") (
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept),
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
multinomial = isMultinomial)(_)
val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) {
featuresStd(j / numCoefficientSets)
Expand All @@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") (
None
}

val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets
Expand Down Expand Up @@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") (
state = states.next()
arrayBuilder += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()

if (state == null) {
Expand Down Expand Up @@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") (
}
}

if (handlePersistence) instances.unpersist()

val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
numClasses, isMultinomial))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {

import MultilayerPerceptronClassifier._

Expand All @@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
@Since("1.5.0")
final def getLayers: Array[Int] = $(layers)

/**
* Block size for stacking input data in matrices to speed up the computation.
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
*
* @group expertParam
*/
@Since("1.5.0")
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices. Data is stacked within partitions." +
" If block size is more than remaining data in a partition then " +
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
ParamValidators.gt(0))

/** @group expertGetParam */
@Since("1.5.0")
final def getBlockSize: Int = $(blockSize)

/**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
Expand Down
Loading

0 comments on commit e19f478

Please sign in to comment.