Skip to content

Commit

Permalink
Revert "[SPARK-30662][ML][PYSPARK] ALS/MLP extend HasBlockSize"
Browse files Browse the repository at this point in the history
This reverts commit f59685a.
  • Loading branch information
zhengruifeng committed Feb 7, 2020
1 parent 3407dc1 commit 255ca22
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {

import MultilayerPerceptronClassifier._

Expand All @@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
@Since("1.5.0")
final def getLayers: Array[Int] = $(layers)

/**
* Block size for stacking input data in matrices to speed up the computation.
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
*
* @group expertParam
*/
@Since("1.5.0")
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices. Data is stacked within partitions." +
" If block size is more than remaining data in a partition then " +
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
ParamValidators.gt(0))

/** @group expertGetParam */
@Since("1.5.0")
final def getBlockSize: Int = $(blockSize)

/**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
Expand Down
46 changes: 12 additions & 34 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel.
*/
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
with HasBlockSize {
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
/**
* Param for the column name for user ids. Ids must be integers. Other
* numeric types are supported for this column, but will be cast to integers as long as they
Expand Down Expand Up @@ -126,8 +125,6 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo

/** @group expertGetParam */
def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)

setDefault(blockSize -> 4096)
}

/**
Expand Down Expand Up @@ -291,15 +288,6 @@ class ALSModel private[ml] (
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
if (featuresA != null && featuresB != null) {
var dotProduct = 0.0f
Expand Down Expand Up @@ -363,7 +351,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllUsers(numItems: Int): DataFrame = {
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
}

/**
Expand All @@ -378,7 +366,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
}

/**
Expand All @@ -389,7 +377,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllItems(numUsers: Int): DataFrame = {
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
}

/**
Expand All @@ -404,7 +392,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
}

/**
Expand Down Expand Up @@ -453,12 +441,11 @@ class ALSModel private[ml] (
dstFactors: DataFrame,
srcOutputColumn: String,
dstOutputColumn: String,
num: Int,
blockSize: Int): DataFrame = {
num: Int): DataFrame = {
import srcFactors.sparkSession.implicits._

val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap { case (srcIter, dstIter) =>
Expand Down Expand Up @@ -496,10 +483,11 @@ class ALSModel private[ml] (

/**
* Blockifies factors to improve the efficiency of cross join
* TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
factors: Dataset[(Int, Array[Float])],
blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
import factors.sparkSession.implicits._
factors.mapPartitions(_.grouped(blockSize))
}
Expand Down Expand Up @@ -666,15 +654,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
*
Expand Down Expand Up @@ -704,7 +683,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
instr.logDataset(dataset)
instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel, blockSize)
seed, intermediateStorageLevel, finalStorageLevel)

val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
Expand All @@ -715,8 +694,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
checkpointInterval = $(checkpointInterval), seed = $(seed))
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
.setParent(this)
val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
copyValues(model)
}

Expand Down
22 changes: 14 additions & 8 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,7 @@ def sigma(self):


class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
HasTol, HasStepSize, HasSolver, HasBlockSize):
HasTol, HasStepSize, HasSolver):
"""
Params for :py:class:`MultilayerPerceptronClassifier`.
Expand All @@ -2185,6 +2185,11 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
"neurons and output layer of 10 neurons.",
typeConverter=TypeConverters.toListInt)
blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
"matrices. Data is stacked within partitions. If block size is more than " +
"remaining data in a partition then it is adjusted to the size of this " +
"data. Recommended size is between 10 and 1000, default is 128.",
typeConverter=TypeConverters.toInt)
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
"options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
Expand All @@ -2197,6 +2202,13 @@ def getLayers(self):
"""
return self.getOrDefault(self.layers)

@since("1.6.0")
def getBlockSize(self):
"""
Gets the value of blockSize or its default value.
"""
return self.getOrDefault(self.blockSize)

@since("2.0.0")
def getInitialWeights(self):
"""
Expand All @@ -2220,17 +2232,11 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
... (1.0, Vectors.dense([0.0, 1.0])),
... (1.0, Vectors.dense([1.0, 0.0])),
... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
>>> mlp.setMaxIter(100)
MultilayerPerceptronClassifier...
>>> mlp.getMaxIter()
100
>>> mlp.getBlockSize()
128
>>> mlp.setBlockSize(1)
MultilayerPerceptronClassifier...
>>> mlp.getBlockSize()
1
>>> model = mlp.fit(df)
>>> model.setFeaturesCol("features")
MultilayerPerceptronClassificationModel...
Expand Down
29 changes: 6 additions & 23 deletions python/pyspark/ml/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


@inherit_doc
class _ALSModelParams(HasPredictionCol, HasBlockSize):
class _ALSModelParams(HasPredictionCol):
"""
Params for :py:class:`ALS` and :py:class:`ALSModel`.
Expand Down Expand Up @@ -223,8 +223,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
0.1
>>> als.clear(als.regParam)
>>> model = als.fit(df)
>>> model.getBlockSize()
4096
>>> model.getUserCol()
'user'
>>> model.setUserCol("user")
Expand Down Expand Up @@ -284,22 +282,21 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
"""
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
"""
super(ALS, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
blockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
kwargs = self._input_kwargs
self.setParams(**kwargs)

Expand All @@ -309,13 +306,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
"""
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
Sets params for ALS.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -446,13 +443,6 @@ def setSeed(self, value):
"""
return self._set(seed=value)

@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)


class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
Expand Down Expand Up @@ -489,13 +479,6 @@ def setPredictionCol(self, value):
"""
return self._set(predictionCol=value)

@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)

@property
@since("1.4.0")
def rank(self):
Expand Down

0 comments on commit 255ca22

Please sign in to comment.