From f69bb3db2e7d370206fea87a81ccfbf6ab5fe54a Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Thu, 30 Jul 2015 03:23:38 -0700 Subject: [PATCH] Addressing reviewers comments. --- .../org/apache/spark/ml/ann/BreezeUtil.scala | 14 +-- .../scala/org/apache/spark/ml/ann/Layer.scala | 98 ++++++++++++------- .../MultilayerPerceptronClassifier.scala | 30 +++--- .../org/apache/spark/ml/param/params.scala | 20 ++-- .../org/apache/spark/ml/ann/ANNSuite.scala | 30 +++--- .../MultilayerPerceptronClassifierSuite.scala | 1 + 6 files changed, 106 insertions(+), 87 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala index ca049dcb7fffd..7429f9d652ac5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala @@ -23,8 +23,9 @@ import com.github.fommil.netlib.BLAS.{getInstance => NativeBLAS} /** * In-place DGEMM and DGEMV for Breeze */ -object BreezeUtil { +private[ann] object BreezeUtil { + // TODO: switch to MLlib BLAS interface private def transposeString(a: BDM[Double]): String = if (a.isTranspose) "T" else "N" /** @@ -40,12 +41,9 @@ object BreezeUtil { require(a.cols == b.rows, "A & B Dimension mismatch!") require(a.rows == c.rows, "A & C Dimension mismatch!") require(b.cols == c.cols, "A & C Dimension mismatch!") - if(a.rows == 0 || b.rows == 0 || a.cols == 0 || b.cols == 0) { - } else { - NativeBLAS.dgemm(transposeString(a), transposeString(b), c.rows, c.cols, a.cols, - alpha, a.data, a.offset, a.majorStride, b.data, b.offset, b.majorStride, - beta, c.data, c.offset, c.rows) - } + NativeBLAS.dgemm(transposeString(a), transposeString(b), c.rows, c.cols, a.cols, + alpha, a.data, a.offset, a.majorStride, b.data, b.offset, b.majorStride, + beta, c.data, c.offset, c.rows) } /** @@ -57,9 +55,7 @@ object BreezeUtil { * @param y y */ def dgemv(alpha: Double, a: BDM[Double], x: BDV[Double], beta: Double, y: BDV[Double]): Unit = { - require(a.cols == x.length, "A & b Dimension mismatch!") - NativeBLAS.dgemv(transposeString(a), a.rows, a.cols, alpha, a.data, a.offset, a.majorStride, x.data, x.offset, x.stride, beta, y.data, y.offset, y.stride) diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala index 422cebe343db2..98168a5c7ad35 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala @@ -17,9 +17,10 @@ package org.apache.spark.ml.ann -import breeze.linalg.{*, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, axpy => brzAxpy, -sum => Bsum} +import breeze.linalg.{*, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, axpy => Baxpy, + sum => Bsum} import breeze.numerics.{log => Blog, sigmoid => Bsigmoid} + import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.optimization._ import org.apache.spark.rdd.RDD @@ -177,8 +178,11 @@ private[ann] object AffineLayerModel { * @param numOut number of layer outputs * @return matrix A and vector b */ - def unroll(weights: Vector, position: Int, - numIn: Int, numOut: Int): (BDM[Double], BDV[Double]) = { + def unroll( + weights: Vector, + position: Int, + numIn: Int, + numOut: Int): (BDM[Double], BDV[Double]) = { val weightsCopy = weights.toArray // TODO: the array is not copied to BDMs, make sure this is OK! val a = new BDM[Double](numOut, numIn, weightsCopy, position) @@ -272,8 +276,11 @@ private[ann] object ActivationFunction { } } - def apply(x1: BDM[Double], x2: BDM[Double], y: BDM[Double], - func: (Double, Double) => Double): Unit = { + def apply( + x1: BDM[Double], + x2: BDM[Double], + y: BDM[Double], + func: (Double, Double) => Double): Unit = { var i = 0 while (i < x1.rows) { var j = 0 @@ -284,7 +291,6 @@ private[ann] object ActivationFunction { i += 1 } } - } /** @@ -320,8 +326,10 @@ private[ann] class SoftmaxFunction extends ActivationFunction { } } - override def crossEntropy(output: BDM[Double], target: BDM[Double], - result: BDM[Double]): Double = { + override def crossEntropy( + output: BDM[Double], + target: BDM[Double], + result: BDM[Double]): Double = { def m(o: Double, t: Double): Double = o - t ActivationFunction(output, target, result, m) -Bsum( target :* Blog(output)) / output.cols @@ -346,11 +354,13 @@ private[ann] class SigmoidFunction extends ActivationFunction { ActivationFunction(x, y, s) } - override def crossEntropy(output: BDM[Double], target: BDM[Double], - result: BDM[Double]): Double = { + override def crossEntropy( + output: BDM[Double], + target: BDM[Double], + result: BDM[Double]): Double = { def m(o: Double, t: Double): Double = o - t ActivationFunction(output, target, result, m) - -Bsum( target :* Blog(output)) / output.cols + -Bsum(target :* Blog(output)) / output.cols } override def derivative(x: BDM[Double], y: BDM[Double]): Unit = { @@ -384,13 +394,17 @@ private[ann] class FunctionalLayer (val activationFunction: ActivationFunction) * Functional layer model. Holds no weights. * @param activationFunction activation function */ -private[ann] class FunctionalLayerModel private (val activationFunction: ActivationFunction - ) extends LayerModel { +private[ann] class FunctionalLayerModel private (val activationFunction: ActivationFunction) + extends LayerModel { val size = 0 - + // matrices for in-place computations + // outputs private var f: BDM[Double] = null + // delta private var d: BDM[Double] = null + // matrix for error computation private var e: BDM[Double] = null + // delta gradient private lazy val dg = new Array[Double](0) override def eval(data: BDM[Double]): BDM[Double] = { @@ -487,7 +501,7 @@ private[ann] trait TopologyModel extends Serializable{ * Feed forward ANN * @param layers */ -class FeedForwardTopology private(val layers: Array[Layer]) extends Topology { +private[ann] class FeedForwardTopology private(val layers: Array[Layer]) extends Topology { override def getInstance(weights: Vector): TopologyModel = FeedForwardModel(this, weights) override def getInstance(seed: Long): TopologyModel = FeedForwardModel(this, seed) @@ -496,7 +510,7 @@ class FeedForwardTopology private(val layers: Array[Layer]) extends Topology { /** * Factory for some of the frequently-used topologies */ -object FeedForwardTopology { +private[ml] object FeedForwardTopology { /** * Creates a feed forward topology from the array of layers * @param layers array of layers @@ -534,19 +548,23 @@ object FeedForwardTopology { * @param layerModels models of layers * @param topology topology of the network */ -private[spark] class FeedForwardModel private(val layerModels: Array[LayerModel], - val topology: FeedForwardTopology) extends TopologyModel { +private[ml] class FeedForwardModel private( + val layerModels: Array[LayerModel], + val topology: FeedForwardTopology) extends TopologyModel { override def forward(data: BDM[Double]): Array[BDM[Double]] = { val outputs = new Array[BDM[Double]](layerModels.length) outputs(0) = layerModels(0).eval(data) - for(i <- 1 until layerModels.length){ + for (i <- 1 until layerModels.length) { outputs(i) = layerModels(i).eval(outputs(i-1)) } outputs } - override def computeGradient(data: BDM[Double], target: BDM[Double], cumGradient: Vector, - realBatchSize: Int): Double = { + override def computeGradient( + data: BDM[Double], + target: BDM[Double], + cumGradient: Vector, + realBatchSize: Int): Double = { val outputs = forward(data) val deltas = new Array[BDM[Double]](layerModels.length) val L = layerModels.length - 1 @@ -585,12 +603,12 @@ private[spark] class FeedForwardModel private(val layerModels: Array[LayerModel] override def weights(): Vector = { // TODO: extract roll var size = 0 - for(i <- 0 until layerModels.length) { + for (i <- 0 until layerModels.length) { size += layerModels(i).size } val array = new Array[Double](size) var offset = 0 - for(i <- 0 until layerModels.length) { + for (i <- 0 until layerModels.length) { val layerWeights = layerModels(i).weights().toArray System.arraycopy(layerWeights, 0, array, offset, layerWeights.length) offset += layerWeights.length @@ -620,7 +638,7 @@ private[ann] object FeedForwardModel { val layers = topology.layers val layerModels = new Array[LayerModel](layers.length) var offset = 0 - for(i <- 0 until layers.length){ + for (i <- 0 until layers.length) { layerModels(i) = layers(i).getInstance(weights, offset) offset += layerModels(i).size } @@ -658,8 +676,11 @@ private[ann] class ANNGradient(topology: Topology, dataStacker: DataStacker) ext (gradient, loss) } - override def compute(data: Vector, label: Double, weights: Vector, - cumGradient: Vector): Double = { + override def compute( + data: Vector, + label: Double, + weights: Vector, + cumGradient: Vector): Double = { val (input, target, realBatchSize) = dataStacker.unstack(data) val model = topology.getInstance(weights) model.computeGradient(input, target, cumGradient, realBatchSize) @@ -684,12 +705,12 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int) */ def stack(data: RDD[(Vector, Vector)]): RDD[(Double, Vector)] = { val stackedData = if (stackSize == 1) { - data.map(v => + data.map { v => (0.0, Vectors.fromBreeze(BDV.vertcat( v._1.toBreeze.toDenseVector, v._2.toBreeze.toDenseVector)) - )) + ) } } else { data.mapPartitions { it => it.grouped(stackSize).map { seq => @@ -728,14 +749,15 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int) */ private[ann] class ANNUpdater extends Updater { - override def compute(weightsOld: Vector, - gradient: Vector, - stepSize: Double, - iter: Int, - regParam: Double): (Vector, Double) = { + override def compute( + weightsOld: Vector, + gradient: Vector, + stepSize: Double, + iter: Int, + regParam: Double): (Vector, Double) = { val thisIterStepSize = stepSize val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector - brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) + Baxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) (Vectors.fromBreeze(brzWeights), 0) } } @@ -746,8 +768,10 @@ private[ann] class ANNUpdater extends Updater { * @param inputSize input size * @param outputSize output size */ -private[ml] class FeedForwardTrainer (topology: Topology, val inputSize: Int, - val outputSize: Int) extends Serializable { +private[ml] class FeedForwardTrainer( + topology: Topology, + val inputSize: Int, + val outputSize: Int) extends Serializable { // TODO: what if we need to pass random seed? private var _weights = topology.getInstance(11L).weights() diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 8702abf964246..8b608e2c3b50b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.DataFrame /** Params for Multilayer Perceptron. */ private[ml] trait MultilayerPerceptronParams extends PredictorParams -with HasSeed with HasMaxIter with HasTol { + with HasSeed with HasMaxIter with HasTol { /** * Layer sizes including input size and output size. * @group param @@ -39,7 +39,7 @@ with HasSeed with HasMaxIter with HasTol { " E.g., Array(780, 100, 10) means 780 inputs, " + "one hidden layer with 100 neurons and output layer of 10 neurons.", // TODO: how to check ALSO that all elements are greater than 0? - ParamValidators.lengthGt(1) + ParamValidators.arrayLengthGt(1) ) /** @group setParam */ @@ -94,12 +94,12 @@ private object LabelConverter { * Returns a vector of given length with zeroes at all positions * and value 1.0 at the position that corresponds to the label. * - * @param labeledPoint labeled point + * @param labeledPoint labeled point * @param labelCount total number of labels - * @return vector encoding of a label + * @return pair of features and vector encoding of a label */ - def apply(labeledPoint: LabeledPoint, labelCount: Int): (Vector, Vector) = { - val output = Array.fill(labelCount){0.0} + def encodeLabeledPoint(labeledPoint: LabeledPoint, labelCount: Int): (Vector, Vector) = { + val output = Array.fill(labelCount)(0.0) output(labeledPoint.label.toInt) = 1.0 (labeledPoint.features, Vectors.dense(output)) } @@ -108,10 +108,10 @@ private object LabelConverter { * Converts a vector to a label. * Returns the position of the maximal element of a vector. * - * @param output label encoded with a vector - * @return label + * @param output label encoded with a vector + * @return label */ - def apply(output: Vector): Double = { + def decodeLabel(output: Vector): Double = { output.argmax.toDouble } } @@ -138,14 +138,14 @@ class MultilayerPerceptronClassifier(override val uid: String) * Developers can implement this instead of [[fit()]] to avoid dealing with schema validation * and copying parameters into the model. * - * @param dataset Training dataset - * @return Fitted model + * @param dataset Training dataset + * @return Fitted model */ override protected def train(dataset: DataFrame): MultilayerPerceptronClassifierModel = { - val labels = getLayers.last.toInt + val myLayers = $(layers) + val labels = myLayers.last val lpData = extractLabeledPoints(dataset) - val data = lpData.map(lp => LabelConverter(lp, labels)) - val myLayers = getLayers.map(_.toInt) + val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels)) val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, true) val FeedForwardTrainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last) FeedForwardTrainer.LBFGSOptimizer.setConvergenceTol(getTol).setNumIterations(getMaxIter) @@ -179,7 +179,7 @@ class MultilayerPerceptronClassifierModel private[ml]( * This internal method is used to implement [[transform()]] and output [[predictionCol]]. */ override protected def predict(features: Vector): Double = { - LabelConverter(mlpModel.predict(features)) + LabelConverter.decodeLabel(mlpModel.predict(features)) } override def copy(extra: ParamMap): MultilayerPerceptronClassifierModel = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 749d2a47682e5..5e1855d6a50b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -167,18 +167,16 @@ object ParamValidators { allowed.contains(value) } - /** Private method for checking array types and converting to Array. */ - private def getArray[T](value: T): Array[_] = value match { - case x: Array[_] => x - case _ => - // The type should be checked before this is ever called. - throw new IllegalArgumentException("Array Param validation failed because" + - s" of unexpected input type: ${value.getClass}") - } - /** Check that the array length is greater than lowerBound. */ - def lengthGt[T](lowerBound: Double): T => Boolean = { (value: T) => - getArray(value).length > lowerBound + def arrayLengthGt[T](lowerBound: Double): T => Boolean = { (value: T) => + val array: Array[_] = value match { + case x: Array[_] => x + case _ => + // The type should be checked before this is ever called. + throw new IllegalArgumentException("Array Param validation failed because" + + s" of unexpected input type: ${value.getClass}") + } + array.length > lowerBound } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala index fcda6c64a3c0b..449288a48da65 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala @@ -26,17 +26,17 @@ class ANNSuite extends SparkFunSuite with MLlibTestSparkContext { // TODO: test for weights comparison with Weka MLP test("ANN with Sigmoid learns XOR function with LBFGS optimizer") { val inputs = Array[Array[Double]]( - Array[Double](0, 0), - Array[Double](0, 1), - Array[Double](1, 0), - Array[Double](1, 1) + Array(0.0, 0.0), + Array(0.0, 1.0), + Array(1.0, 0.0), + Array(1.0, 1.0) ) - val outputs = Array[Double](0, 1, 1, 0) + val outputs = Array(0.0, 1.0, 1.0, 0.0) val data = inputs.zip(outputs).map { case (features, label) => (Vectors.dense(features), Vectors.dense(Array(label))) } val rddData = sc.parallelize(data, 1) - val hiddenLayersTopology = Array[Int](5) + val hiddenLayersTopology = Array(5) val dataSample = rddData.first() val layerSizes = dataSample._1.size +: hiddenLayersTopology :+ dataSample._2.size val topology = FeedForwardTopology.multiLayerPerceptron(layerSizes, false) @@ -53,22 +53,22 @@ class ANNSuite extends SparkFunSuite with MLlibTestSparkContext { test("ANN with SoftMax learns XOR function with 2-bit output and batch GD optimizer") { val inputs = Array[Array[Double]]( - Array[Double](0, 0), - Array[Double](0, 1), - Array[Double](1, 0), - Array[Double](1, 1) + Array(0.0, 0.0), + Array(0.0, 1.0), + Array(1.0, 0.0), + Array(1.0, 1.0) ) val outputs = Array[Array[Double]]( - Array[Double](1, 0), - Array[Double](0, 1), - Array[Double](0, 1), - Array[Double](1, 0) + Array(1.0, 0.0), + Array(0.0, 1.0), + Array(0.0, 1.0), + Array(1.0, 0.0) ) val data = inputs.zip(outputs).map { case (features, label) => (Vectors.dense(features), Vectors.dense(label)) } val rddData = sc.parallelize(data, 1) - val hiddenLayersTopology = Array[Int](5) + val hiddenLayersTopology = Array(5) val dataSample = rddData.first() val layerSizes = dataSample._1.size +: hiddenLayersTopology :+ dataSample._2.size val topology = FeedForwardTopology.multiLayerPerceptron(layerSizes, false) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala index 0d2016daff2dd..a42b0b362345f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala @@ -48,6 +48,7 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp assert(p == l) } } + // TODO: implement a more rigorous test test("3 class classification with 2 hidden layers") { val nPoints = 1000