Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[SPARK-30642][SPARK-30659][SPARK-30660][SPARK-30662]" #27487

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",

"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ private[spark] object BLAS extends Serializable {

val xTemp = xValues(k) * alpha
while (i < indEnd) {
val rowIndex = Arows(i)
yValues(Arows(i)) += Avals(i) * xTemp
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold with HasBlockSize {
with HasAggregationDepth with HasThreshold {

/**
* Param for threshold in binary classification prediction.
Expand Down Expand Up @@ -155,26 +155,19 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE

val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
Expand Down Expand Up @@ -215,33 +208,20 @@ class LinearSVC @Since("2.2.0") (
throw new SparkException(msg)
}

val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(featuresStd.apply)))
if ($(standardization)) None else Some(getFeaturesStd)))
} else {
None
}

val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

def regParamL1Fun = (index: Int) => 0D
Expand All @@ -258,7 +238,6 @@ class LinearSVC @Since("2.2.0") (
scaledObjectiveHistory += state.adjustedValue
}

blocks.unpersist()
bcFeaturesStd.destroy()
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -289,6 +268,8 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}

if (handlePersistence) instances.unpersist()

copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.LogisticAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
with HasBlockSize {
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {

import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames

Expand Down Expand Up @@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") (
@Since("2.2.0")
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)

/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

private def assertBoundConstrainedOptimizationParamsValid(
numCoefficientSets: Int,
numFeatures: Int): Unit = {
Expand Down Expand Up @@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") (
this
}

override protected[spark] def train(
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}

protected[spark] def train(
dataset: Dataset[_],
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
val instances = extractInstances(dataset)

if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
fitIntercept)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
Expand Down Expand Up @@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") (
s"dangerous ground, so the algorithm may not converge.")
}

val featuresMean = summarizer.mean.compressed
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.std.toArray

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
Expand All @@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") (
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept),
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
multinomial = isMultinomial)(_)
val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) {
featuresStd(j / numCoefficientSets)
Expand All @@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") (
None
}

val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets
Expand Down Expand Up @@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") (
state = states.next()
arrayBuilder += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()

if (state == null) {
Expand Down Expand Up @@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") (
}
}

if (handlePersistence) instances.unpersist()

val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
numClasses, isMultinomial))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {

import MultilayerPerceptronClassifier._

Expand All @@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
@Since("1.5.0")
final def getLayers: Array[Int] = $(layers)

/**
* Block size for stacking input data in matrices to speed up the computation.
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
*
* @group expertParam
*/
@Since("1.5.0")
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices. Data is stacked within partitions." +
" If block size is more than remaining data in a partition then " +
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
ParamValidators.gt(0))

/** @group expertGetParam */
@Since("1.5.0")
final def getBlockSize: Int = $(blockSize)

/**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
Expand Down
Loading