diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index a967df857bed3..12b5a0f4178f6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,7 +17,8 @@ package org.apache.spark.mllib.classification -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis} +import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels import org.apache.spark.{SparkException, Logging} import org.apache.spark.SparkContext._ @@ -25,6 +26,15 @@ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD + +/** + * + */ +object NaiveBayesModels extends Enumeration { + type NaiveBayesModels = Value + val Multinomial, Bernoulli = Value +} + /** * Model for Naive Bayes Classifiers. * @@ -32,28 +42,42 @@ import org.apache.spark.rdd.RDD * @param pi log of class priors, whose dimension is C, number of labels * @param theta log of class conditional probabilities, whose dimension is C-by-D, * where D is number of features + * @param model The type of NB model to fit from the enumeration NaiveBayesModels, can be + * Multinomial or Bernoulli */ + class NaiveBayesModel private[mllib] ( val labels: Array[Double], val pi: Array[Double], - val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { - - private val brzPi = new BDV[Double](pi) - private val brzTheta = new BDM[Double](theta.length, theta(0).length) + val theta: Array[Array[Double]], + val model: NaiveBayesModels) extends ClassificationModel with Serializable { - { - // Need to put an extra pair of braces to prevent Scala treating `i` as a member. + def populateMatrix(arrayIn: Array[Array[Double]], + matrixIn: BDM[Double], + transformation: (Double) => Double = (x) => x) = { var i = 0 - while (i < theta.length) { + while (i < arrayIn.length) { var j = 0 - while (j < theta(i).length) { - brzTheta(i, j) = theta(i)(j) + while (j < arrayIn(i).length) { + matrixIn(i, j) = transformation(theta(i)(j)) j += 1 } i += 1 } } + private val brzPi = new BDV[Double](pi) + private val brzTheta = new BDM[Double](theta.length, theta(0).length) + populateMatrix(theta, brzTheta) + + private val brzNegTheta: Option[BDM[Double]] = model match { + case NaiveBayesModels.Multinomial => None + case NaiveBayesModels.Bernoulli => + val negTheta = new BDM[Double](theta.length, theta(0).length) + populateMatrix(theta, negTheta, (x) => math.log(1.0 - math.exp(x))) + Option(negTheta) + } + override def predict(testData: RDD[Vector]): RDD[Double] = { val bcModel = testData.context.broadcast(this) testData.mapPartitions { iter => @@ -63,7 +87,14 @@ class NaiveBayesModel private[mllib] ( } override def predict(testData: Vector): Double = { - labels(brzArgmax(brzPi + brzTheta * testData.toBreeze)) + model match { + case NaiveBayesModels.Multinomial => + labels (brzArgmax (brzPi + brzTheta * testData.toBreeze) ) + case NaiveBayesModels.Bernoulli => + labels (brzArgmax (brzPi + + (brzTheta - brzNegTheta.get) * testData.toBreeze + + brzSum(brzNegTheta.get, Axis._1))) + } } } @@ -75,9 +106,12 @@ class NaiveBayesModel private[mllib] ( * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The input feature values must be nonnegative. */ -class NaiveBayes private (private var lambda: Double) extends Serializable with Logging { +class NaiveBayes private (private var lambda: Double, + var model: NaiveBayesModels) extends Serializable with Logging { - def this() = this(1.0) + def this(lambda: Double) = this(lambda, NaiveBayesModels.Multinomial) + + def this() = this(1.0, NaiveBayesModels.Multinomial) /** Set the smoothing parameter. Default: 1.0. */ def setLambda(lambda: Double): NaiveBayes = { @@ -85,6 +119,13 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with this } + /** Set the model type. Default: Multinomial. */ + def setModelType(model: NaiveBayesModels): NaiveBayes = { + this.model = model + this + } + + /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * @@ -118,21 +159,27 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) => (c1._1 + c2._1, c1._2 += c2._2) ).collect() + val numLabels = aggregated.length var numDocuments = 0L aggregated.foreach { case (_, (n, _)) => numDocuments += n } val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } + val labels = new Array[Double](numLabels) val pi = new Array[Double](numLabels) val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) + val piLogDenom = math.log(numDocuments + numLabels * lambda) var i = 0 aggregated.foreach { case (label, (n, sumTermFreqs)) => labels(i) = label - val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda) pi(i) = math.log(n + lambda) - piLogDenom + val thetaLogDenom = model match { + case NaiveBayesModels.Multinomial => math.log(brzSum(sumTermFreqs) + numFeatures * lambda) + case NaiveBayesModels.Bernoulli => math.log(n + 2.0 * lambda) + } var j = 0 while (j < numFeatures) { theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom @@ -141,7 +188,7 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with i += 1 } - new NaiveBayesModel(labels, pi, theta) + new NaiveBayesModel(labels, pi, theta, model) } } @@ -154,8 +201,7 @@ object NaiveBayes { * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for - * document classification. By making every vector a 0-1 vector, it can also be used as - * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * document classification. * * This version of the method uses a default smoothing parameter of 1.0. * @@ -171,8 +217,7 @@ object NaiveBayes { * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for - * document classification. By making every vector a 0-1 vector, it can also be used as - * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * document classification. * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. @@ -181,4 +226,25 @@ object NaiveBayes { def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } + + + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is by default the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle + * all kinds of discrete data. For example, by converting documents into TF-IDF vectors, + * it can be used for document classification. By making every vector a 0-1 vector and + * setting the model type to NaiveBayesModels.Bernoulli, it fits and predicts as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + * @param lambda The smoothing parameter + * + * @param model The type of NB model to fit from the enumeration NaiveBayesModels, can be + * Multinomial or Bernoulli + */ + def train(input: RDD[LabeledPoint], lambda: Double, model: NaiveBayesModels): NaiveBayesModel = { + new NaiveBayes(lambda, model).run(input) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index e68fe89d6ccea..44ba6118eb61d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.mllib.classification +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis} +import breeze.stats.distributions.Multinomial +import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels + import scala.util.Random import org.scalatest.FunSuite @@ -39,10 +43,12 @@ object NaiveBayesSuite { // Generate input of the form Y = (theta * x).argmax() def generateNaiveBayesInput( - pi: Array[Double], // 1XC - theta: Array[Array[Double]], // CXD - nPoints: Int, - seed: Int): Seq[LabeledPoint] = { + pi: Array[Double], // 1XC + theta: Array[Array[Double]], // CXD + nPoints: Int, + seed: Int, + dataModel: NaiveBayesModels = NaiveBayesModels.Multinomial, + sample: Int = 10): Seq[LabeledPoint] = { val D = theta(0).length val rnd = new Random(seed) @@ -51,8 +57,17 @@ object NaiveBayesSuite { for (i <- 0 until nPoints) yield { val y = calcLabel(rnd.nextDouble(), _pi) - val xi = Array.tabulate[Double](D) { j => - if (rnd.nextDouble() < _theta(y)(j)) 1 else 0 + val xi = dataModel match { + case NaiveBayesModels.Bernoulli => Array.tabulate[Double] (D) {j => + if (rnd.nextDouble () < _theta(y)(j) ) 1 else 0 + } + case NaiveBayesModels.Multinomial => + val mult = Multinomial(BDV(_theta(y))) + val emptyMap = (0 until D).map(x => (x, 0.0)).toMap + val counts = emptyMap ++ mult.sample(sample).groupBy(x => x).map { + case (index, reps) => (index, reps.size.toDouble) + } + counts.toArray.sortBy(_._1).map(_._2) } LabeledPoint(y, Vectors.dense(xi)) @@ -71,23 +86,68 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext { assert(numOfPredictions < input.length / 5) } - test("Naive Bayes") { + def validateModelFit(piData: Array[Double], thetaData: Array[Array[Double]], model: NaiveBayesModel) = { + def closeFit(d1: Double, d2: Double, precision: Double): Boolean = { + (d1 - d2).abs <= precision + } + val modelIndex = (0 until piData.length).zip(model.labels.map(_.toInt)) + for (i <- modelIndex) { + assert(closeFit(math.exp(piData(i._2)), math.exp(model.pi(i._1)), 0.05)) + } + for (i <- modelIndex) { + val sortedData = thetaData(i._2).sorted + val sortedModel = model.theta(i._1).sorted + for (j <- 0 until sortedData.length) { + assert(closeFit(math.exp(sortedData(j)), math.exp(sortedModel(j)), 0.05)) + } + } + } + + test("Naive Bayes Multinomial") { + val nPoints = 1000 + + val pi = Array(0.5, 0.1, 0.4).map(math.log) + val theta = Array( + Array(0.70, 0.10, 0.10, 0.10), // label 0 + Array(0.10, 0.70, 0.10, 0.10), // label 1 + Array(0.10, 0.10, 0.70, 0.10) // label 2 + ).map(_.map(math.log)) + + val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42, NaiveBayesModels.Multinomial) + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val model = NaiveBayes.train(testRDD, 1.0, NaiveBayesModels.Multinomial) + validateModelFit(pi, theta, model) + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17, NaiveBayesModels.Multinomial) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("Naive Bayes Bernoulli") { val nPoints = 10000 val pi = Array(0.5, 0.3, 0.2).map(math.log) val theta = Array( - Array(0.91, 0.03, 0.03, 0.03), // label 0 - Array(0.03, 0.91, 0.03, 0.03), // label 1 - Array(0.03, 0.03, 0.91, 0.03) // label 2 + Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.40), // label 0 + Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02), // label 1 + Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30) // label 2 ).map(_.map(math.log)) - val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) + val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 45, NaiveBayesModels.Bernoulli) val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val model = NaiveBayes.train(testRDD) + val model = NaiveBayes.train(testRDD, 1.0, NaiveBayesModels.Bernoulli) ///!!! this gives same result on both models check the math + validateModelFit(pi, theta, model) - val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 20, NaiveBayesModels.Bernoulli) val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD.