Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12664][ML] Expose probability in mlp model #17373

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,17 +361,42 @@ private[ann] trait TopologyModel extends Serializable {
* Forward propagation
*
* @param data input data
* @param includeLastLayer Include the last layer in the output. In
* MultilayerPerceptronClassifier, the last layer is always softmax;
* the last layer of outputs is needed for class predictions, but not
* for rawPrediction.
*
* @return array of outputs for each of the layers
*/
def forward(data: BDM[Double]): Array[BDM[Double]]
def forward(data: BDM[Double], includeLastLayer: Boolean): Array[BDM[Double]]

/**
* Prediction of the model
* Prediction of the model. See {@link ProbabilisticClassificationModel}
*
* @param data input data
* @param features input features
* @return prediction
*/
def predict(data: Vector): Vector
def predict(features: Vector): Vector

/**
* Raw prediction of the model. See {@link ProbabilisticClassificationModel}
*
* @param features input features
* @return raw prediction
*
* Note: This interface is only used for classification Model.
*/
def predictRaw(features: Vector): Vector

/**
* Probability of the model. See {@link ProbabilisticClassificationModel}
*
* @param rawPrediction raw prediction vector
* @return probability
*
* Note: This interface is only used for classification Model.
*/
def raw2ProbabilityInPlace(rawPrediction: Vector): Vector

/**
* Computes gradient for the network
Expand Down Expand Up @@ -463,7 +488,7 @@ private[ml] class FeedForwardModel private(
private var outputs: Array[BDM[Double]] = null
private var deltas: Array[BDM[Double]] = null

override def forward(data: BDM[Double]): Array[BDM[Double]] = {
override def forward(data: BDM[Double], includeLastLayer: Boolean): Array[BDM[Double]] = {
// Initialize output arrays for all layers. Special treatment for InPlace
val currentBatchSize = data.cols
// TODO: allocate outputs as one big array and then create BDMs from it
Expand All @@ -481,7 +506,8 @@ private[ml] class FeedForwardModel private(
}
}
layerModels(0).eval(data, outputs(0))
for (i <- 1 until layerModels.length) {
val end = if (includeLastLayer) layerModels.length else layerModels.length - 1
for (i <- 1 until end) {
layerModels(i).eval(outputs(i - 1), outputs(i))
}
outputs
Expand All @@ -492,7 +518,7 @@ private[ml] class FeedForwardModel private(
target: BDM[Double],
cumGradient: Vector,
realBatchSize: Int): Double = {
val outputs = forward(data)
val outputs = forward(data, true)
val currentBatchSize = data.cols
// TODO: allocate deltas as one big array and then create BDMs from it
if (deltas == null || deltas(0).cols != currentBatchSize) {
Expand Down Expand Up @@ -527,9 +553,20 @@ private[ml] class FeedForwardModel private(

override def predict(data: Vector): Vector = {
val size = data.size
val result = forward(new BDM[Double](size, 1, data.toArray))
val result = forward(new BDM[Double](size, 1, data.toArray), true)
Vectors.dense(result.last.toArray)
}

override def predictRaw(data: Vector): Vector = {
val result = forward(new BDM[Double](data.size, 1, data.toArray), false)
Vectors.dense(result(result.length - 2).toArray)
}

override def raw2ProbabilityInPlace(data: Vector): Vector = {
val dataMatrix = new BDM[Double](data.size, 1, data.toArray)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add raw2ProbabilityInPlace, what it compute is:

softmax(rawPredictionsVector) ==> predictionsVector

directly call the last layer function to compute it.

layerModels.last.eval(dataMatrix, dataMatrix)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the eval method can operate in-place. That is fine for the last layer for MLP (SoftmaxLayerModelWithCrossEntropyLoss), but not OK in general. More generally, these methods for classifiers should not go in the very general TopologyModel abstraction; that abstraction may be used in the future for regression as well. I'd be fine with putting this classification-specific logic in MLP itself; we do not need to generalize the logic until we add other Classifiers, which might take a long time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping: If this proposal sounds good, then can you please update accordingly?

data
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.ml.util._
import org.apache.spark.sql.Dataset

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends PredictorParams
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {

import MultilayerPerceptronClassifier._
Expand Down Expand Up @@ -143,7 +143,8 @@ private object LabelConverter {
@Since("1.5.0")
class MultilayerPerceptronClassifier @Since("1.5.0") (
@Since("1.5.0") override val uid: String)
extends Predictor[Vector, MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel]
extends ProbabilisticClassifier[Vector, MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel]
with MultilayerPerceptronParams with DefaultParamsWritable {

@Since("1.5.0")
Expand Down Expand Up @@ -301,13 +302,13 @@ class MultilayerPerceptronClassificationModel private[ml] (
@Since("1.5.0") override val uid: String,
@Since("1.5.0") val layers: Array[Int],
@Since("2.0.0") val weights: Vector)
extends PredictionModel[Vector, MultilayerPerceptronClassificationModel]
extends ProbabilisticClassificationModel[Vector, MultilayerPerceptronClassificationModel]
with Serializable with MLWritable {

@Since("1.6.0")
override val numFeatures: Int = layers.head

private val mlpModel = FeedForwardTopology
private[ml] val mlpModel = FeedForwardTopology
.multiLayerPerceptron(layers, softmaxOnTop = true)
.model(weights)

Expand Down Expand Up @@ -335,6 +336,14 @@ class MultilayerPerceptronClassificationModel private[ml] (
@Since("2.0.0")
override def write: MLWriter =
new MultilayerPerceptronClassificationModel.MultilayerPerceptronClassificationModelWriter(this)

override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = {
mlpModel.raw2ProbabilityInPlace(rawPrediction)
}

override protected def predictRaw(features: Vector): Vector = mlpModel.predictRaw(features)

override def numClasses: Int = layers.last
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class GradientSuite extends SparkFunSuite with MLlibTestSparkContext {
}

private def computeLoss(input: BDM[Double], target: BDM[Double], model: TopologyModel): Double = {
val outputs = model.forward(input)
val outputs = model.forward(input, true)
model.layerModels.last match {
case layerWithLoss: LossFunction =>
layerWithLoss.loss(outputs.last, target, new BDM[Double](target.rows, target.cols))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions._

class MultilayerPerceptronClassifierSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
Expand Down Expand Up @@ -82,6 +83,47 @@ class MultilayerPerceptronClassifierSuite
}
}

test("Predicted class probabilities: calibration on toy dataset") {
val layers = Array[Int](4, 5, 2)

val strongDataset = Seq(
(Vectors.dense(1, 2, 3, 4), 0d, Vectors.dense(1d, 0d)),
(Vectors.dense(4, 3, 2, 1), 1d, Vectors.dense(0d, 1d)),
(Vectors.dense(1, 1, 1, 1), 0d, Vectors.dense(.5, .5)),
(Vectors.dense(1, 1, 1, 1), 1d, Vectors.dense(.5, .5))
).toDF("features", "label", "expectedProbability")
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(1)
.setSeed(123L)
.setMaxIter(100)
.setSolver("l-bfgs")
val model = trainer.fit(strongDataset)
val result = model.transform(strongDataset)
result.select("probability", "expectedProbability").collect().foreach {
case Row(p: Vector, e: Vector) =>
assert(p ~== e absTol 1e-3)
}
}

test("test model probability") {
val layers = Array[Int](2, 5, 2)
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(1)
.setSeed(123L)
.setMaxIter(100)
.setSolver("l-bfgs")
val model = trainer.fit(dataset)
model.setProbabilityCol("probability")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the default already, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping --- this should not be necessary

val result = model.transform(dataset)
val features2prob = udf { features: Vector => model.mlpModel.predict(features) }
result.select(features2prob(col("features")), col("probability")).collect().foreach {
case Row(p1: Vector, p2: Vector) =>
assert(p1 ~== p2 absTol 1e-3)
}
}

test("Test setWeights by training restart") {
val dataFrame = Seq(
(Vectors.dense(0.0, 0.0), 0.0),
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol,
>>> testDF = spark.createDataFrame([
... (Vectors.dense([1.0, 0.0]),),
... (Vectors.dense([0.0, 0.0]),)], ["features"])
>>> model.transform(testDF).show()
>>> model.transform(testDF).select("features", "prediction").show()
+---------+----------+
| features|prediction|
+---------+----------+
Expand Down Expand Up @@ -1512,7 +1512,7 @@ def getInitialWeights(self):
return self.getOrDefault(self.initialWeights)


class MultilayerPerceptronClassificationModel(JavaModel, JavaPredictionModel, JavaMLWritable,
class MultilayerPerceptronClassificationModel(JavaModel, JavaClassificationModel, JavaMLWritable,
JavaMLReadable):
"""
Model fitted by MultilayerPerceptronClassifier.
Expand Down