Skip to content

Commit

Permalink
Revert "[SPARK-30659][ML][PYSPARK] LogisticRegression blockify input …
Browse files Browse the repository at this point in the history
…vectors"

This reverts commit 073ce12.
  • Loading branch information
zhengruifeng committed Feb 7, 2020
1 parent 255ca22 commit 0dafab5
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class LinearSVC @Since("2.2.0") (

/**
* Set block size for stacking input data in matrices.
* Default is 1024.
* Default is 4096.
*
* @group expertSetParam
*/
Expand Down Expand Up @@ -240,7 +240,7 @@ class LinearSVC @Since("2.2.0") (
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_)
val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.LogisticAggregator
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
Expand All @@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
with HasBlockSize {
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {

import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames

Expand Down Expand Up @@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") (
@Since("2.2.0")
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)

/**
* Set block size for stacking input data in matrices.
* Default is 1024.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

private def assertBoundConstrainedOptimizationParamsValid(
numCoefficientSets: Int,
numFeatures: Int): Unit = {
Expand Down Expand Up @@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") (
this
}

override protected[spark] def train(
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}

protected[spark] def train(
dataset: Dataset[_],
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
val instances = extractInstances(dataset)

if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
fitIntercept)

val sc = dataset.sparkSession.sparkContext
val instances = extractInstances(dataset)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
Expand Down Expand Up @@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") (
s"dangerous ground, so the algorithm may not converge.")
}

val featuresMean = summarizer.mean.compressed
val featuresStd = summarizer.std.compressed
val bcFeaturesStd = sc.broadcast(featuresStd)
val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.std.toArray

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
Expand All @@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") (
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept),
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
multinomial = isMultinomial)(_)
val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) {
featuresStd(j / numCoefficientSets)
Expand All @@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") (
None
}

val standardized = instances.map {
case Instance(label, weight, features) =>
val featuresStd = bcFeaturesStd.value
val array = Array.ofDim[Double](numFeatures)
features.foreachNonZero { (i, v) =>
val std = featuresStd(i)
if (std != 0) array(i) = v / std
}
Instance(label, weight, Vectors.dense(array))
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets
Expand Down Expand Up @@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") (
state = states.next()
arrayBuilder += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()

if (state == null) {
Expand Down Expand Up @@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") (
}
}

if (handlePersistence) instances.unpersist()

val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
numClasses, isMultinomial))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.spark.ml.linalg._
*/
private[ml] class HingeAggregator(
numFeatures: Int,
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
fitIntercept: Boolean,
blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] {

private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
Expand All @@ -54,6 +55,20 @@ private[ml] class HingeAggregator(
}
}

@transient private lazy val intercept =
if (fitIntercept) coefficientsArray(numFeatures) else 0.0

@transient private lazy val linearGradSumVec = {
if (fitIntercept) {
new DenseVector(Array.ofDim[Double](numFeatures))
} else {
null
}
}

@transient private lazy val auxiliaryVec =
new DenseVector(Array.ofDim[Double](blockSize))


/**
* Add a new training instance to this HingeAggregator, and update the loss and gradient
Expand Down Expand Up @@ -123,14 +138,19 @@ private[ml] class HingeAggregator(
val localGradientSumArray = gradientSumArray

// vec here represents dotProducts
val vec = if (fitIntercept && coefficientsArray.last != 0) {
val intercept = coefficientsArray.last
new DenseVector(Array.fill(size)(intercept))
val vec = if (size == blockSize) {
auxiliaryVec
} else {
// the last block within one partition may be of size less than blockSize
new DenseVector(Array.ofDim[Double](size))
}

if (fitIntercept) {
var i = 0
while (i < size) {
vec.values(i) = intercept
i += 1
}
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
} else {
BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
Expand Down Expand Up @@ -165,9 +185,6 @@ private[ml] class HingeAggregator(
if (vec.values.forall(_ == 0)) return this

if (fitIntercept) {
// localGradientSumArray is of size numFeatures+1, so can not
// be directly used as the output of BLAS.gemv
val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
localGradientSumArray(numFeatures) += vec.values.sum
Expand Down
Loading

0 comments on commit 0dafab5

Please sign in to comment.