From 255ca222a7b15ca0c9d8293927d8ff67c7c5560a Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 7 Feb 2020 13:46:13 +0800 Subject: [PATCH] Revert "[SPARK-30662][ML][PYSPARK] ALS/MLP extend HasBlockSize" This reverts commit f59685acaa3e9c227f14fe4d8f9e94a1ac664b05. --- .../MultilayerPerceptronClassifier.scala | 22 ++++++++- .../apache/spark/ml/recommendation/ALS.scala | 46 +++++-------------- python/pyspark/ml/classification.py | 22 +++++---- python/pyspark/ml/recommendation.py | 29 +++--------- 4 files changed, 53 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 6e8f92b9b1e64..c7a8237849b5b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion /** Params for Multilayer Perceptron. */ private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams - with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize { + with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver { import MultilayerPerceptronClassifier._ @@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl @Since("1.5.0") final def getLayers: Array[Int] = $(layers) + /** + * Block size for stacking input data in matrices to speed up the computation. + * Data is stacked within partitions. If block size is more than remaining data in + * a partition then it is adjusted to the size of this data. + * Recommended size is between 10 and 1000. + * Default: 128 + * + * @group expertParam + */ + @Since("1.5.0") + final val blockSize: IntParam = new IntParam(this, "blockSize", + "Block size for stacking input data in matrices. Data is stacked within partitions." + + " If block size is more than remaining data in a partition then " + + "it is adjusted to the size of this data. Recommended size is between 10 and 1000", + ParamValidators.gt(0)) + + /** @group expertGetParam */ + @Since("1.5.0") + final def getBlockSize: Int = $(blockSize) + /** * The solver algorithm for optimization. * Supported options: "gd" (minibatch gradient descent) or "l-bfgs". diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 002146f89e79a..2fb9a276be887 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -54,8 +54,7 @@ import org.apache.spark.util.random.XORShiftRandom /** * Common params for ALS and ALSModel. */ -private[recommendation] trait ALSModelParams extends Params with HasPredictionCol - with HasBlockSize { +private[recommendation] trait ALSModelParams extends Params with HasPredictionCol { /** * Param for the column name for user ids. Ids must be integers. Other * numeric types are supported for this column, but will be cast to integers as long as they @@ -126,8 +125,6 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo /** @group expertGetParam */ def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT) - - setDefault(blockSize -> 4096) } /** @@ -291,15 +288,6 @@ class ALSModel private[ml] ( @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) - /** - * Set block size for stacking input data in matrices. - * Default is 4096. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) => if (featuresA != null && featuresB != null) { var dotProduct = 0.0f @@ -363,7 +351,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllUsers(numItems: Int): DataFrame = { - recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) + recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) } /** @@ -378,7 +366,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) - recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) + recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) } /** @@ -389,7 +377,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllItems(numUsers: Int): DataFrame = { - recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) + recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) } /** @@ -404,7 +392,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) - recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) + recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) } /** @@ -453,12 +441,11 @@ class ALSModel private[ml] ( dstFactors: DataFrame, srcOutputColumn: String, dstOutputColumn: String, - num: Int, - blockSize: Int): DataFrame = { + num: Int): DataFrame = { import srcFactors.sparkSession.implicits._ - val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) - val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] .flatMap { case (srcIter, dstIter) => @@ -496,10 +483,11 @@ class ALSModel private[ml] ( /** * Blockifies factors to improve the efficiency of cross join + * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( factors: Dataset[(Int, Array[Float])], - blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = { + blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { import factors.sparkSession.implicits._ factors.mapPartitions(_.grouped(blockSize)) } @@ -666,15 +654,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) - /** - * Set block size for stacking input data in matrices. - * Default is 4096. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - /** * Sets both numUserBlocks and numItemBlocks to the specific value. * @@ -704,7 +683,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] instr.logDataset(dataset) instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval, - seed, intermediateStorageLevel, finalStorageLevel, blockSize) + seed, intermediateStorageLevel, finalStorageLevel) val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank), numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), @@ -715,8 +694,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] checkpointInterval = $(checkpointInterval), seed = $(seed)) val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") - val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize)) - .setParent(this) + val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) copyValues(model) } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 5ee42318afd45..bb9cd034808fc 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -2174,7 +2174,7 @@ def sigma(self): class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter, - HasTol, HasStepSize, HasSolver, HasBlockSize): + HasTol, HasStepSize, HasSolver): """ Params for :py:class:`MultilayerPerceptronClassifier`. @@ -2185,6 +2185,11 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H "E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " + "neurons and output layer of 10 neurons.", typeConverter=TypeConverters.toListInt) + blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " + + "matrices. Data is stacked within partitions. If block size is more than " + + "remaining data in a partition then it is adjusted to the size of this " + + "data. Recommended size is between 10 and 1000, default is 128.", + typeConverter=TypeConverters.toInt) solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " + "options: l-bfgs, gd.", typeConverter=TypeConverters.toString) initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.", @@ -2197,6 +2202,13 @@ def getLayers(self): """ return self.getOrDefault(self.layers) + @since("1.6.0") + def getBlockSize(self): + """ + Gets the value of blockSize or its default value. + """ + return self.getOrDefault(self.blockSize) + @since("2.0.0") def getInitialWeights(self): """ @@ -2220,17 +2232,11 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer ... (1.0, Vectors.dense([0.0, 1.0])), ... (1.0, Vectors.dense([1.0, 0.0])), ... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"]) - >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123) + >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123) >>> mlp.setMaxIter(100) MultilayerPerceptronClassifier... >>> mlp.getMaxIter() 100 - >>> mlp.getBlockSize() - 128 - >>> mlp.setBlockSize(1) - MultilayerPerceptronClassifier... - >>> mlp.getBlockSize() - 1 >>> model = mlp.fit(df) >>> model.setFeaturesCol("features") MultilayerPerceptronClassificationModel... diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index fe571e25c05f5..ee276962c898b 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -28,7 +28,7 @@ @inherit_doc -class _ALSModelParams(HasPredictionCol, HasBlockSize): +class _ALSModelParams(HasPredictionCol): """ Params for :py:class:`ALS` and :py:class:`ALSModel`. @@ -223,8 +223,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): 0.1 >>> als.clear(als.regParam) >>> model = als.fit(df) - >>> model.getBlockSize() - 4096 >>> model.getUserCol() 'user' >>> model.setUserCol("user") @@ -284,13 +282,13 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): """ __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=false, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096) + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") """ super(ALS, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid) @@ -298,8 +296,7 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", - blockSize=4096) + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") kwargs = self._input_kwargs self.setParams(**kwargs) @@ -309,13 +306,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): """ setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=False, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096) + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") Sets params for ALS. """ kwargs = self._input_kwargs @@ -446,13 +443,6 @@ def setSeed(self, value): """ return self._set(seed=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable): """ @@ -489,13 +479,6 @@ def setPredictionCol(self, value): """ return self._set(predictionCol=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - @property @since("1.4.0") def rank(self):