diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index b40597c556221..3c2badb528f12 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -20,22 +20,78 @@ package org.apache.spark.ml.classification import breeze.linalg.{argmax => Bargmax} import org.apache.spark.annotation.Experimental -import org.apache.spark.ml.{PredictionModel, Predictor} -import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.{HasTol, HasMaxIter, HasSeed} +import org.apache.spark.ml.{PredictorParams, PredictionModel, Predictor} +import org.apache.spark.ml.param.{IntParam, ParamValidators, IntArrayParam, ParamMap} import org.apache.spark.ml.util.Identifiable -import org.apache.spark.ml.regression.MultilayerPerceptronParams import org.apache.spark.mllib.ann.{FeedForwardTrainer, FeedForwardTopology} import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.sql.DataFrame -/** - * :: Experimental :: - * Label to vector converter. - */ -@Experimental -private object LabelConverter { +/** Params for Multilayer Perceptron. */ +private[ml] trait MultilayerPerceptronParams extends PredictorParams +with HasSeed with HasMaxIter with HasTol { + /** + * Layer sizes including input size and output size. + * @group param + */ + final val layers: IntArrayParam = new IntArrayParam(this, "layers", + "Sizes of layers from input layer to output layer" + + " E.g., Array(780, 100, 10) means 780 inputs, " + + "one hidden layer with 100 neurons and output layer of 10 neurons.", + // TODO: how to check ALSO that all elements are greater than 0? + ParamValidators.lengthGt(1) + ) + + /** @group setParam */ + def setLayers(value: Array[Int]): this.type = set(layers, value) + + /** @group getParam */ + final def getLayers: Array[Int] = $(layers) + + /** + * Block size for stacking input data in matrices. Speeds up the computations. + * Cannot be more than the size of the dataset. + * @group expertParam + */ + final val blockSize: IntParam = new IntParam(this, "blockSize", + "Block size for stacking input data in matrices.", ParamValidators.gt(0)) + + /** @group setParam */ + def setBlockSize(value: Int): this.type = set(blockSize, value) + + /** @group getParam */ + final def getBlockSize: Int = $(blockSize) + /** + * Set the maximum number of iterations. + * Default is 100. + * @group setParam + */ + def setMaxIter(value: Int): this.type = set(maxIter, value) + + /** + * Set the convergence tolerance of iterations. + * Smaller value will lead to higher accuracy with the cost of more iterations. + * Default is 1E-4. + * @group setParam + */ + def setTol(value: Double): this.type = set(tol, value) + + /** + * Set the seed for weights initialization. + * @group setParam + */ + def setSeed(value: Long): this.type = set(seed, value) + + setDefault(maxIter -> 100, tol -> 1e-4, layers -> Array(1, 1), blockSize -> 1) +} + + +/** Label to vector converter. */ +private object LabelConverter { + // TODO: Use OneHotEncoder instead /** * Encodes a label as a vector. * Returns a vector of given length with zeroes at all positions @@ -59,7 +115,7 @@ private object LabelConverter { * @return label */ def apply(output: Vector): Double = { - Bargmax(output.toBreeze.toDenseVector).toDouble + output.argmax.toDouble } } @@ -72,14 +128,14 @@ private object LabelConverter { * */ @Experimental -class MultilayerPerceptronClassifier (override val uid: String) +class MultilayerPerceptronClassifier(override val uid: String) extends Predictor[Vector, MultilayerPerceptronClassifier, MultilayerPerceptronClassifierModel] with MultilayerPerceptronParams { - override def copy(extra: ParamMap): MultilayerPerceptronClassifier = defaultCopy(extra) - def this() = this(Identifiable.randomUID("mlpc")) + override def copy(extra: ParamMap): MultilayerPerceptronClassifier = defaultCopy(extra) + /** * Train a model using the given dataset and parameters. * Developers can implement this instead of [[fit()]] to avoid dealing with schema validation @@ -106,11 +162,16 @@ class MultilayerPerceptronClassifier (override val uid: String) * :: Experimental :: * Classifier model based on the Multilayer Perceptron. * Each layer has sigmoid activation function, output layer has softmax. + * @param uid uid + * @param layers array of layer sizes including input and output layers + * @param weights vector of initial weights for the model + * @return prediction model */ @Experimental -class MultilayerPerceptronClassifierModel private[ml] (override val uid: String, - layers: Array[Int], - weights: Vector) +class MultilayerPerceptronClassifierModel private[ml]( + override val uid: String, + layers: Array[Int], + weights: Vector) extends PredictionModel[Vector, MultilayerPerceptronClassifierModel] with Serializable { diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 824efa5ed4b28..749d2a47682e5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -166,6 +166,20 @@ object ParamValidators { def inArray[T](allowed: java.util.List[T]): T => Boolean = { (value: T) => allowed.contains(value) } + + /** Private method for checking array types and converting to Array. */ + private def getArray[T](value: T): Array[_] = value match { + case x: Array[_] => x + case _ => + // The type should be checked before this is ever called. + throw new IllegalArgumentException("Array Param validation failed because" + + s" of unexpected input type: ${value.getClass}") + } + + /** Check that the array length is greater than lowerBound. */ + def lengthGt[T](lowerBound: Double): T => Boolean = { (value: T) => + getArray(value).length > lowerBound + } } // specialize primitive-typed params because Java doesn't recognize scala.Double, scala.Int, ... diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/MultilayerPerceptronRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/MultilayerPerceptronRegressor.scala deleted file mode 100644 index 28deeaab8428b..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/MultilayerPerceptronRegressor.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.regression - -import breeze.linalg.{argmax => Bargmax} - -import org.apache.spark.Logging -import org.apache.spark.annotation.Experimental -import org.apache.spark.ml.{Model, Transformer, Estimator, PredictorParams} -import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.util.Identifiable -import org.apache.spark.mllib.ann.{FeedForwardTopology, FeedForwardTrainer} -import org.apache.spark.mllib.linalg.{VectorUDT, Vector} -import org.apache.spark.sql.{Row, DataFrame} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StructField, StructType} - -/** - * Params for Multilayer Perceptron. - */ -private[ml] trait MultilayerPerceptronParams extends PredictorParams -with HasSeed with HasMaxIter with HasTol { - /** - * Layer sizes including input size and output size. - * @group param - */ - final val layers: IntArrayParam = - // TODO: we need IntegerArrayParam! - new IntArrayParam(this, "layers", - "Sizes of layers including input and output from bottom to the top." + - " E.g., Array(780, 100, 10) means 780 inputs, " + - "hidden layer with 100 neurons and output layer of 10 neurons." - // TODO: how to check that array is not empty? - ) - - /** - * Block size for stacking input data in matrices. Speeds up the computations. - * Cannot be more than the size of the dataset. - * @group expertParam - */ - final val blockSize: IntParam = new IntParam(this, "blockSize", - "Block size for stacking input data in matrices.", - ParamValidators.gt(0)) - - /** @group setParam */ - def setLayers(value: Array[Int]): this.type = set(layers, value) - - /** @group getParam */ - final def getLayers: Array[Int] = $(layers) - - /** @group setParam */ - def setBlockSize(value: Int): this.type = set(blockSize, value) - - /** @group getParam */ - final def getBlockSize: Int = $(blockSize) - - /** - * Set the maximum number of iterations. - * Default is 100. - * @group setParam - */ - def setMaxIter(value: Int): this.type = set(maxIter, value) - - /** - * Set the convergence tolerance of iterations. - * Smaller value will lead to higher accuracy with the cost of more iterations. - * Default is 1E-4. - * @group setParam - */ - def setTol(value: Double): this.type = set(tol, value) - - /** - * Set the seed for weights initialization. - * Default is 11L. - * @group setParam - */ - def setSeed(value: Long): this.type = set(seed, value) - - setDefault(seed -> 11L, maxIter -> 100, tol -> 1e-4, layers -> Array(1, 1), blockSize -> 1) -} - -/** - * :: Experimental :: - * Multi-layer perceptron regression. Contains sigmoid activation function on all layers. - * See https://en.wikipedia.org/wiki/Multilayer_perceptron for details. - * - */ -@Experimental -class MultilayerPerceptronRegressor (override val uid: String) - extends Estimator[MultilayerPerceptronRegressorModel] - with MultilayerPerceptronParams with HasInputCol with HasOutputCol with HasRawPredictionCol - with Logging { - - /** @group setParam */ - def setInputCol(value: String): this.type = set(inputCol, value) - - /** @group setParam */ - def setOutputCol(value: String): this.type = set(outputCol, value) - - /** - * Fits a model to the input and output data. - * InputCol has to contain input vectors. - * OutputCol has to contain output vectors. - */ - override def fit(dataset: DataFrame): MultilayerPerceptronRegressorModel = { - val data = dataset.select($(inputCol), $(outputCol)).map { - case Row(x: Vector, y: Vector) => (x, y) - } - val myLayers = getLayers - val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, false) - val FeedForwardTrainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last) - FeedForwardTrainer.LBFGSOptimizer.setConvergenceTol(getTol).setNumIterations(getMaxIter) - FeedForwardTrainer.setStackSize(getBlockSize) - val mlpModel = FeedForwardTrainer.train(data) - new MultilayerPerceptronRegressorModel(uid, myLayers, mlpModel.weights()) - } - - /** - * :: DeveloperApi :: - * - * Derives the output schema from the input schema. - */ - override def transformSchema(schema: StructType): StructType = { - val inputType = schema($(inputCol)).dataType - require(inputType.isInstanceOf[VectorUDT], - s"Input column ${$(inputCol)} must be a vector column") - val outputType = schema($(outputCol)).dataType - require(outputType.isInstanceOf[VectorUDT], - s"Input column ${$(outputCol)} must be a vector column") - require(!schema.fieldNames.contains($(rawPredictionCol)), - s"Output column ${$(rawPredictionCol)} already exists.") - val outputFields = schema.fields :+ StructField($(rawPredictionCol), new VectorUDT, false) - StructType(outputFields) - } - - def this() = this(Identifiable.randomUID("mlpr")) - - override def copy(extra: ParamMap): MultilayerPerceptronRegressor = defaultCopy(extra) -} - -/** - * :: Experimental :: - * Multi-layer perceptron regression model. - * - * @param layers array of layer sizes including input and output - * @param weights weights (or parameters) of the model - */ -@Experimental -class MultilayerPerceptronRegressorModel private[ml] (override val uid: String, - layers: Array[Int], - weights: Vector) - extends Model[MultilayerPerceptronRegressorModel] - with HasInputCol with HasRawPredictionCol { - - private val mlpModel = - FeedForwardTopology.multiLayerPerceptron(layers, false).getInstance(weights) - - /** @group setParam */ - def setInputCol(value: String): this.type = set(inputCol, value) - - /** - * Transforms the input dataset. - * InputCol has to contain input vectors. - * RawPrediction column will contain predictions (outputs of the regressor). - */ - override def transform(dataset: DataFrame): DataFrame = { - transformSchema(dataset.schema, logging = true) - val pcaOp = udf { mlpModel.predict _ } - dataset.withColumn($(rawPredictionCol), pcaOp(col($(inputCol)))) - } - - /** - * :: DeveloperApi :: - * - * Derives the output schema from the input schema. - */ - override def transformSchema(schema: StructType): StructType = { - val inputType = schema($(inputCol)).dataType - require(inputType.isInstanceOf[VectorUDT], - s"Input column ${$(inputCol)} must be a vector column") - require(!schema.fieldNames.contains($(rawPredictionCol)), - s"Output column ${$(rawPredictionCol)} already exists.") - val outputFields = schema.fields :+ StructField($(rawPredictionCol), new VectorUDT, false) - StructType(outputFields) - } - - override def copy(extra: ParamMap): MultilayerPerceptronRegressorModel = { - copyValues(new MultilayerPerceptronRegressorModel(uid, layers, weights), extra) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/ann/Layer.scala b/mllib/src/main/scala/org/apache/spark/mllib/ann/Layer.scala index 27b1d95794e76..cedaccc6192bc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/ann/Layer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/ann/Layer.scala @@ -534,7 +534,7 @@ object FeedForwardTopology { * @param layerModels models of layers * @param topology topology of the network */ -class FeedForwardModel private(val layerModels: Array[LayerModel], +private[spark] class FeedForwardModel private(val layerModels: Array[LayerModel], val topology: FeedForwardTopology) extends TopologyModel { override def forward(data: BDM[Double]): Array[BDM[Double]] = { val outputs = new Array[BDM[Double]](layerModels.length) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala index 13bf7f707ced9..0d2016daff2dd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala @@ -18,27 +18,25 @@ package org.apache.spark.ml.classification import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.classification.LogisticRegressionSuite._ +import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS +import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.Row class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSparkContext { - test("XOR function learning as 2-class classification problem") { - val inputs = Array[Array[Double]]( - Array[Double](0, 0), - Array[Double](0, 1), - Array[Double](1, 0), - Array[Double](1, 1) - ) - val outputs = Array[Double](0, 1, 1, 0) - val data = inputs.zip(outputs).map{ case(input, output) => - new LabeledPoint(output, Vectors.dense(input))} - val rddData = sc.parallelize(data, 2) + test("XOR function learning as binary classification problem with two outputs.") { + val dataFrame = sqlContext.createDataFrame(Seq( + (Vectors.dense(0.0, 0.0), 0.0), + (Vectors.dense(0.0, 1.0), 1.0), + (Vectors.dense(1.0, 0.0), 1.0), + (Vectors.dense(1.0, 1.0), 0.0)) + ).toDF("features", "label") val layers = Array[Int](2, 5, 2) - val dataFrame = sqlContext.createDataFrame(rddData) - val trainer = new MultilayerPerceptronClassifier("mlpc") + val trainer = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(1) .setSeed(11L) @@ -46,7 +44,46 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp val model = trainer.fit(dataFrame) val result = model.transform(dataFrame) val predictionAndLabels = result.select("prediction", "label").collect() - assert(predictionAndLabels.forall { case Row (p: Double, l: Double) => - (p - l) == 0 }) + predictionAndLabels.foreach { case Row(p: Double, l: Double) => + assert(p == l) } + } + + test("3 class classification with 2 hidden layers") { + val nPoints = 1000 + + // The following weights are taken from OneVsRestSuite.scala + // they represent 3-class iris dataset + val weights = Array( + -0.57997, 0.912083, -0.371077, -0.819866, 2.688191, + -0.16624, -0.84355, -0.048509, -0.301789, 4.170682) + + val xMean = Array(5.843, 3.057, 3.758, 1.199) + val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) + val rdd = sc.parallelize(generateMultinomialLogisticInput( + weights, xMean, xVariance, true, nPoints, 42), 2) + val dataFrame = sqlContext.createDataFrame(rdd).toDF("label", "features") + val numClasses = 3 + val numIterations = 100 + val layers = Array[Int](4, 5, 4, numClasses) + val trainer = new MultilayerPerceptronClassifier() + .setLayers(layers) + .setBlockSize(1) + .setSeed(11L) + .setMaxIter(numIterations) + val model = trainer.fit(dataFrame) + val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label") + .map { case Row(p: Double, l: Double) => (p, l) } + // train multinomial logistic regression + val lr = new LogisticRegressionWithLBFGS() + .setIntercept(true) + .setNumClasses(numClasses) + lr.optimizer.setRegParam(0.0) + .setNumIterations(numIterations) + val lrModel = lr.run(rdd) + val lrPredictionAndLabels = lrModel.predict(rdd.map(_.features)).zip(rdd.map(_.label)) + // MLP's predictions should not differ a lot from LR's. + val lrMetrics = new MulticlassMetrics(lrPredictionAndLabels) + val mlpMetrics = new MulticlassMetrics(mlpPredictionAndLabels) + assert(mlpMetrics.confusionMatrix ~== lrMetrics.confusionMatrix absTol 100) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/MultilayerPerceptronRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/MultilayerPerceptronRegressorSuite.scala deleted file mode 100644 index eef266e91f8da..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/MultilayerPerceptronRegressorSuite.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.regression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.Row -import org.apache.spark.mllib.util.TestingUtils._ - -class MultilayerPerceptronRegressorSuite extends SparkFunSuite with MLlibTestSparkContext { - - test("XOR function learning") { - val inputs = Array[Array[Double]]( - Array[Double](0, 0), - Array[Double](0, 1), - Array[Double](1, 0), - Array[Double](1, 1) - ) - val outputs = Array[Double](0, 1, 1, 0) - val data = inputs.zip(outputs).map { case (features, label) => - (Vectors.dense(features), Vectors.dense(Array(label))) - } - val rddData = sc.parallelize(data, 1) - val dataFrame = sqlContext.createDataFrame(rddData).toDF("inputCol", "outputCol") - val hiddenLayersTopology = Array[Int](5) - val dataSample = rddData.first() - val layerSizes = dataSample._1.size +: hiddenLayersTopology :+ dataSample._2.size - val trainer = new MultilayerPerceptronRegressor("mlpr") - .setInputCol("inputCol") - .setOutputCol("outputCol") - .setBlockSize(1) - .setLayers(layerSizes) - .setMaxIter(100) - .setTol(1e-4) - .setSeed(11L) - val model = trainer.fit(dataFrame) - .setInputCol("inputCol") - model.transform(dataFrame) - .select("rawPrediction", "outputCol").collect().foreach { - case Row(x: Vector, y: Vector) => - assert(x ~== y absTol 1e-3, "Transformed vector is different with expected vector.") - } } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala index 340bcf0a9d60b..24cbb1d1b47a6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext class ANNSuite extends SparkFunSuite with MLlibTestSparkContext { + // TODO: test for weights comparison with Weka MLP test("ANN with Sigmoid learns XOR function with LBFGS optimizer") { val inputs = Array[Array[Double]]( Array[Double](0, 0),