From 0bd8434243fd5710e6d2de6768328fa9ba4bc562 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Tue, 8 Jul 2014 12:25:57 +0400 Subject: [PATCH 01/16] Chi Squared feature selection: initial version --- .../spark/mllib/feature/ChiSquared.scala | 44 +++++++++++++++++++ .../spark/mllib/feature/ChiSquaredSuite.scala | 24 ++++++++++ 2 files changed, 68 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala new file mode 100644 index 0000000000000..4b5e42b0485a1 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -0,0 +1,44 @@ +package org.apache.spark.mllib.feature + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +class ChiSquared(labeledData:RDD[LabeledPoint]) extends java.io.Serializable{ + + val indexByLabel = labeledData.map(labeledPoint => labeledPoint.label).distinct.collect.zipWithIndex.toMap + val labelsByIndex = indexByLabel.map(_.swap) + + lazy val chi2Data:RDD[((Int, Double), Double)] = labeledData.flatMap{ + labeledPoint => + labeledPoint.features.toArray.zipWithIndex.map{ + case(featureValue, featureIndex) => + /** array of feature presence/absence in a class */ + val counts = Array.fill[(Int, Int)](indexByLabel.size)(0, 0) + val label = labeledPoint.label + counts(indexByLabel(label)) = if(featureValue != 0) (1, 0) else (0, 1) + (featureIndex, counts) + } + }.reduceByKey{ + case(x, y) => + x.zip(y).map{case((a1, b1), (a2, b2)) => + (a1 + a2, b1 + b2)} + }.flatMap{ + case(featureIndex, counts) => + val (featureCount, notFeatureCount) = counts.foldLeft((0, 0)){case((sumA, sumB),(a, b)) => + (sumA + a, sumB + b)} + val (featureClassCounts, notFeatureClassCounts) = counts.unzip + val notFeatureNotClassCounts = notFeatureClassCounts.map(notFeatureCount - _) + val featureNotClassCounts = featureClassCounts.map(featureCount - _) + val iCounts = counts.zipWithIndex + iCounts.map{case((a, b), labelIndex) => + val n11 = featureClassCounts(labelIndex) + val n10 = featureNotClassCounts(labelIndex) + val n01 = notFeatureClassCounts(labelIndex) + val n00 = notFeatureNotClassCounts(labelIndex) + val chi2 = (n11 + n10 + n01 + n00) * sqr(n11 * n00 - n10 * n01).toDouble / + ((n11 + n01) * (n11 + n10) * (n10 + n00) * (n01 + n00)) + ((featureIndex, labelsByIndex(labelIndex)), chi2)} + } + private def sqr(x:Int):Int = x * x +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala new file mode 100644 index 0000000000000..98785c79912c0 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -0,0 +1,24 @@ +package org.apache.spark.mllib.feature + +import org.apache.spark.mllib.linalg.{SparseVector, Vectors} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.util.LocalSparkContext + +import org.scalatest.FunSuite + +class ChiSquaredSuite extends FunSuite with LocalSparkContext { + + test("Chi Squared feature selection test") { + val labeledData = sc.parallelize( + Seq( new LabeledPoint(0.0, Vectors.sparse(3, Seq((0, 8.8),(1, 9.9)))), + new LabeledPoint(1.0, Vectors.sparse(3, Seq((0, 1.1),(2, 3.3)))), + new LabeledPoint(1.0, Vectors.sparse(3, Seq((1, 2.2),(2, 4.4)))) + ), 2) + + val chiSquared = new ChiSquared(labeledData) + chiSquared.chi2Data.foreach{println} + } + + +} From 2ade254c8ecb8e39cae7fe9da0a909269a629433 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Tue, 8 Jul 2014 13:43:27 +0400 Subject: [PATCH 02/16] Code style --- .../spark/mllib/feature/ChiSquared.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 4b5e42b0485a1..8b33e7149557b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -4,34 +4,35 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -class ChiSquared(labeledData:RDD[LabeledPoint]) extends java.io.Serializable{ +class ChiSquared(labeledData:RDD[LabeledPoint]) extends java.io.Serializable { - val indexByLabel = labeledData.map(labeledPoint => labeledPoint.label).distinct.collect.zipWithIndex.toMap + val indexByLabel = labeledData.map(labeledPoint => + labeledPoint.label).distinct.collect.zipWithIndex.toMap val labelsByIndex = indexByLabel.map(_.swap) - lazy val chi2Data:RDD[((Int, Double), Double)] = labeledData.flatMap{ + lazy val chi2Data: RDD[((Int, Double), Double)] = labeledData.flatMap { labeledPoint => - labeledPoint.features.toArray.zipWithIndex.map{ - case(featureValue, featureIndex) => - /** array of feature presence/absence in a class */ - val counts = Array.fill[(Int, Int)](indexByLabel.size)(0, 0) - val label = labeledPoint.label - counts(indexByLabel(label)) = if(featureValue != 0) (1, 0) else (0, 1) - (featureIndex, counts) + labeledPoint.features.toArray.zipWithIndex.map { + case (featureValue, featureIndex) => + /** array of feature presence/absence in a class */ + val counts = Array.fill[(Int, Int)](indexByLabel.size)(0, 0) + val label = labeledPoint.label + counts(indexByLabel(label)) = if(featureValue != 0) (1, 0) else (0, 1) + (featureIndex, counts) } - }.reduceByKey{ - case(x, y) => - x.zip(y).map{case((a1, b1), (a2, b2)) => + }.reduceByKey { + case (x, y) => + x.zip(y).map { case ((a1, b1), (a2, b2)) => (a1 + a2, b1 + b2)} - }.flatMap{ - case(featureIndex, counts) => - val (featureCount, notFeatureCount) = counts.foldLeft((0, 0)){case((sumA, sumB),(a, b)) => - (sumA + a, sumB + b)} + }.flatMap { + case (featureIndex, counts) => val (featureClassCounts, notFeatureClassCounts) = counts.unzip + val featureCount = featureClassCounts.sum + val notFeatureCount = notFeatureClassCounts.sum val notFeatureNotClassCounts = notFeatureClassCounts.map(notFeatureCount - _) val featureNotClassCounts = featureClassCounts.map(featureCount - _) val iCounts = counts.zipWithIndex - iCounts.map{case((a, b), labelIndex) => + iCounts.map { case ((a, b), labelIndex) => val n11 = featureClassCounts(labelIndex) val n10 = featureNotClassCounts(labelIndex) val n01 = notFeatureClassCounts(labelIndex) From ca49e803ec9610ea676ccdebe2283711f04a9d41 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Wed, 9 Jul 2014 14:06:54 +0400 Subject: [PATCH 03/16] Feature selection filter --- .../spark/mllib/feature/ChiSquared.scala | 2 +- .../mllib/feature/FeatureSelection.scala | 30 +++++++++++++++++++ .../mllib/feature/FeatureSelectionSuite.scala | 21 +++++++++++++ 3 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 8b33e7149557b..7b3fc5a73a2a7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -4,7 +4,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -class ChiSquared(labeledData:RDD[LabeledPoint]) extends java.io.Serializable { +class ChiSquared(labeledData: RDD[LabeledPoint]) extends java.io.Serializable { val indexByLabel = labeledData.map(labeledPoint => labeledPoint.label).distinct.collect.zipWithIndex.toMap diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala new file mode 100644 index 0000000000000..83d37f153394b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala @@ -0,0 +1,30 @@ +package org.apache.spark.mllib.feature + +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +abstract class FeatureSelection { + + /** make LabeledPoint or Vector a type parameter so it can select from both */ + def select(data: RDD[LabeledPoint]): Set[Int] + + def selectAndFilter(data: RDD[LabeledPoint]): RDD[LabeledPoint] +} + +object FeatureSelection extends java.io.Serializable { + + /** make LabeledPoint or Vector a type parameter so it can filter both */ + def filter(data: RDD[LabeledPoint], indexes: Set[Int]): RDD[LabeledPoint] = + data.map {labeledPoint => + new LabeledPoint(labeledPoint.label, filter(labeledPoint.features, indexes)) + } + + def filter(features: Vector, indexes: Set[Int]): Vector = { + val (values, _) = + features.toArray.zipWithIndex.filter { case (value, index) => + indexes.contains(index)}.unzip + /** probably make a sparse vector if it was initially sparse */ + Vectors.dense(values.toArray) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala new file mode 100644 index 0000000000000..91c13d228303a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala @@ -0,0 +1,21 @@ +package org.apache.spark.mllib.feature + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.util.LocalSparkContext +import org.scalatest.FunSuite + +class FeatureSelectionSuite extends FunSuite with LocalSparkContext{ + + test("FeatureSelection test") { + val labeledData = sc.parallelize( + Seq( new LabeledPoint(0.0, Vectors.sparse(3, Seq((0, 8.8),(1, 9.9)))), + new LabeledPoint(1.0, Vectors.sparse(3, Seq((0, 1.1),(2, 3.3)))), + new LabeledPoint(1.0, Vectors.sparse(3, Seq((1, 2.2),(2, 4.4)))) + ), 2) + labeledData.foreach(println) + val filteredData = FeatureSelection.filter(labeledData, Set(1,2)) + filteredData.foreach(println) + } + +} From e24eee41720f964443e0410f226b90d7f30bdca8 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Wed, 9 Jul 2014 19:55:07 +0400 Subject: [PATCH 04/16] Traits for FeatureSelection, CombinationsCalculator and FeatureFilter --- .../spark/mllib/feature/ChiSquared.scala | 71 +++++++++++++------ .../mllib/feature/FeatureSelection.scala | 4 +- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 7b3fc5a73a2a7..94c47e68e1f89 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -4,27 +4,47 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -class ChiSquared(labeledData: RDD[LabeledPoint]) extends java.io.Serializable { - - val indexByLabel = labeledData.map(labeledPoint => - labeledPoint.label).distinct.collect.zipWithIndex.toMap - val labelsByIndex = indexByLabel.map(_.swap) - - lazy val chi2Data: RDD[((Int, Double), Double)] = labeledData.flatMap { - labeledPoint => - labeledPoint.features.toArray.zipWithIndex.map { - case (featureValue, featureIndex) => - /** array of feature presence/absence in a class */ - val counts = Array.fill[(Int, Int)](indexByLabel.size)(0, 0) - val label = labeledPoint.label - counts(indexByLabel(label)) = if(featureValue != 0) (1, 0) else (0, 1) - (featureIndex, counts) +private[feature] trait FeatureFilter extends java.io.Serializable { + /** methods for feature filtering based on statistics */ + protected def topForClass +} + +private[feature] trait CombinationsCalculator extends java.io.Serializable { + + protected def indexByLabelMap(labeledData: RDD[LabeledPoint]) = { + labeledData.map(labeledPoint => + labeledPoint.label).distinct.collect.zipWithIndex.toMap + } + + protected def featureLabelCombinations(labeledData: RDD[LabeledPoint]) = { + val indexByLabel = indexByLabelMap(labeledData) + labeledData.flatMap { + labeledPoint => + labeledPoint.features.toArray.zipWithIndex.map { + case (featureValue, featureIndex) => + /** array of feature presence/absence in a class */ + val counts = Array.fill[(Int, Int)](indexByLabel.size)(0, 0) + val label = labeledPoint.label + counts(indexByLabel(label)) = if(featureValue != 0) (1, 0) else (0, 1) + (featureIndex, counts) + } + }.reduceByKey { + case (x, y) => + x.zip(y).map { case ((a1, b1), (a2, b2)) => + (a1 + a2, b1 + b2)} } - }.reduceByKey { - case (x, y) => - x.zip(y).map { case ((a1, b1), (a2, b2)) => - (a1 + a2, b1 + b2)} - }.flatMap { + } +} + +private[feature] class ChiSquared(labeledData: RDD[LabeledPoint]) + extends java.io.Serializable with CombinationsCalculator with FeatureSelection { + + private val indexByLabel = indexByLabelMap(labeledData) + private val labelsByIndex = indexByLabel.map(_.swap) + private val combinations = featureLabelCombinations(labeledData) + + /** not private for test purposes */ + lazy val chi2Data: RDD[((Int, Double), Double)] = combinations.flatMap { case (featureIndex, counts) => val (featureClassCounts, notFeatureClassCounts) = counts.unzip val featureCount = featureClassCounts.sum @@ -41,5 +61,14 @@ class ChiSquared(labeledData: RDD[LabeledPoint]) extends java.io.Serializable { ((n11 + n01) * (n11 + n10) * (n10 + n00) * (n01 + n00)) ((featureIndex, labelsByIndex(labelIndex)), chi2)} } - private def sqr(x:Int):Int = x * x + private def sqr(x: Int): Int = x * x + + override def select(data: RDD[LabeledPoint]): Set[Int] = { + /** the actual selection process should run here*/ + Set(1, 2) + } } + +object ChiSquared{ + def compute(labeledData: RDD[LabeledPoint]) = new ChiSquared(labeledData) +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala index 83d37f153394b..84863de7c34ca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala @@ -4,12 +4,10 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -abstract class FeatureSelection { +trait FeatureSelection { /** make LabeledPoint or Vector a type parameter so it can select from both */ def select(data: RDD[LabeledPoint]): Set[Int] - - def selectAndFilter(data: RDD[LabeledPoint]): RDD[LabeledPoint] } object FeatureSelection extends java.io.Serializable { From aab9b738fc63bed99dbce066c395a12454fc0ae8 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 11 Jul 2014 13:14:29 +0400 Subject: [PATCH 05/16] Feature selection redesign with vigdorchik --- .../spark/mllib/feature/ChiSquared.scala | 36 +++++++++---------- .../mllib/feature/FeatureSelection.scala | 24 +++++++------ .../mllib/feature/FeatureSelectionSuite.scala | 2 +- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 94c47e68e1f89..3065b48de6112 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -4,9 +4,14 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -private[feature] trait FeatureFilter extends java.io.Serializable { +private[feature] trait FeatureSelector extends java.io.Serializable { /** methods for feature filtering based on statistics */ - protected def topForClass + protected def top(featureClassValues: RDD[((Int, Double), Double)], n: Int): Set[Int] = { + val (featureIndexes, _) = featureClassValues.map { + case ((featureIndex, label), value) => (featureIndex, value) + }.reduceByKey(Math.max(_, _)).collect().sortBy(- _._2).unzip + featureIndexes.toSet + } } private[feature] trait CombinationsCalculator extends java.io.Serializable { @@ -36,15 +41,18 @@ private[feature] trait CombinationsCalculator extends java.io.Serializable { } } -private[feature] class ChiSquared(labeledData: RDD[LabeledPoint]) - extends java.io.Serializable with CombinationsCalculator with FeatureSelection { +class ChiSquared(labeledData: RDD[LabeledPoint]) +extends java.io.Serializable with CombinationsCalculator with LabeledPointFeatureFilter { + + override def data: RDD[LabeledPoint] = labeledData - private val indexByLabel = indexByLabelMap(labeledData) - private val labelsByIndex = indexByLabel.map(_.swap) - private val combinations = featureLabelCombinations(labeledData) + override def select: Set[Int] = Set(1, 2) - /** not private for test purposes */ - lazy val chi2Data: RDD[((Int, Double), Double)] = combinations.flatMap { + val indexByLabel = indexByLabelMap(labeledData) + val labelsByIndex = indexByLabel.map(_.swap) + val combinations = featureLabelCombinations(labeledData) + + val chi2Data: RDD[((Int, Double), Double)] = combinations.flatMap { case (featureIndex, counts) => val (featureClassCounts, notFeatureClassCounts) = counts.unzip val featureCount = featureClassCounts.sum @@ -61,14 +69,6 @@ private[feature] class ChiSquared(labeledData: RDD[LabeledPoint]) ((n11 + n01) * (n11 + n10) * (n10 + n00) * (n01 + n00)) ((featureIndex, labelsByIndex(labelIndex)), chi2)} } - private def sqr(x: Int): Int = x * x - override def select(data: RDD[LabeledPoint]): Set[Int] = { - /** the actual selection process should run here*/ - Set(1, 2) - } + private def sqr(x: Int): Int = x * x } - -object ChiSquared{ - def compute(labeledData: RDD[LabeledPoint]) = new ChiSquared(labeledData) -} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala index 84863de7c34ca..9573611c29335 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala @@ -4,21 +4,23 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -trait FeatureSelection { - - /** make LabeledPoint or Vector a type parameter so it can select from both */ - def select(data: RDD[LabeledPoint]): Set[Int] +trait FeatureSelection[T] extends java.io.Serializable { + def data: RDD[T] + def select: Set[Int] } -object FeatureSelection extends java.io.Serializable { +sealed trait FeatureFilter[T] extends FeatureSelection[T] { + def filter: RDD[T] +} - /** make LabeledPoint or Vector a type parameter so it can filter both */ - def filter(data: RDD[LabeledPoint], indexes: Set[Int]): RDD[LabeledPoint] = - data.map {labeledPoint => - new LabeledPoint(labeledPoint.label, filter(labeledPoint.features, indexes)) - } +trait LabeledPointFeatureFilter extends FeatureFilter[LabeledPoint] { + lazy val indices = select + def filter: RDD[LabeledPoint] = + data.map { lp => new LabeledPoint(lp.label, Compress(lp.features, indices)) } +} - def filter(features: Vector, indexes: Set[Int]): Vector = { +object Compress { + def apply(features: Vector, indexes: Set[Int]): Vector = { val (values, _) = features.toArray.zipWithIndex.filter { case (value, index) => indexes.contains(index)}.unzip diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala index 91c13d228303a..f988322c56640 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala @@ -14,7 +14,7 @@ class FeatureSelectionSuite extends FunSuite with LocalSparkContext{ new LabeledPoint(1.0, Vectors.sparse(3, Seq((1, 2.2),(2, 4.4)))) ), 2) labeledData.foreach(println) - val filteredData = FeatureSelection.filter(labeledData, Set(1,2)) + val filteredData = new ChiSquared(labeledData).filter filteredData.foreach(println) } From 66e0333e0bc19822acdcc8ea3cae7a1f47efe3cb Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 11 Jul 2014 20:04:36 +0400 Subject: [PATCH 06/16] Feature selector, fix of lazyness --- .../spark/mllib/feature/ChiSquared.scala | 17 +++++++++------ .../mllib/feature/FeatureSelection.scala | 5 +++-- .../spark/mllib/feature/ChiSquaredSuite.scala | 3 +++ .../mllib/feature/FeatureSelectionSuite.scala | 21 ------------------- 4 files changed, 17 insertions(+), 29 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 3065b48de6112..090cb13acf471 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -4,13 +4,14 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -private[feature] trait FeatureSelector extends java.io.Serializable { +private[feature] trait FeatureSort extends java.io.Serializable { /** methods for feature filtering based on statistics */ protected def top(featureClassValues: RDD[((Int, Double), Double)], n: Int): Set[Int] = { + println(featureClassValues.first()) val (featureIndexes, _) = featureClassValues.map { case ((featureIndex, label), value) => (featureIndex, value) }.reduceByKey(Math.max(_, _)).collect().sortBy(- _._2).unzip - featureIndexes.toSet + featureIndexes.take(n).toSet } } @@ -42,14 +43,18 @@ private[feature] trait CombinationsCalculator extends java.io.Serializable { } class ChiSquared(labeledData: RDD[LabeledPoint]) -extends java.io.Serializable with CombinationsCalculator with LabeledPointFeatureFilter { +extends java.io.Serializable with CombinationsCalculator +with LabeledPointFeatureFilter with FeatureSort { override def data: RDD[LabeledPoint] = labeledData - override def select: Set[Int] = Set(1, 2) + override def select: Set[Int] = { - val indexByLabel = indexByLabelMap(labeledData) - val labelsByIndex = indexByLabel.map(_.swap) + println(chi2Data.first()) + top(chi2Data, 1) + } + + val labelsByIndex = indexByLabelMap(labeledData).map(_.swap) val combinations = featureLabelCombinations(labeledData) val chi2Data: RDD[((Int, Double), Double)] = combinations.flatMap { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala index 9573611c29335..4feccc1816eb0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala @@ -14,9 +14,10 @@ sealed trait FeatureFilter[T] extends FeatureSelection[T] { } trait LabeledPointFeatureFilter extends FeatureFilter[LabeledPoint] { - lazy val indices = select - def filter: RDD[LabeledPoint] = + def filter: RDD[LabeledPoint] = { + val indices = select data.map { lp => new LabeledPoint(lp.label, Compress(lp.features, indices)) } + } } object Compress { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala index 98785c79912c0..5d8596d6f8759 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -15,9 +15,12 @@ class ChiSquaredSuite extends FunSuite with LocalSparkContext { new LabeledPoint(1.0, Vectors.sparse(3, Seq((0, 1.1),(2, 3.3)))), new LabeledPoint(1.0, Vectors.sparse(3, Seq((1, 2.2),(2, 4.4)))) ), 2) + labeledData.foreach(println) val chiSquared = new ChiSquared(labeledData) chiSquared.chi2Data.foreach{println} + val filteredData = chiSquared.filter + filteredData.foreach(println) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala deleted file mode 100644 index f988322c56640..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/FeatureSelectionSuite.scala +++ /dev/null @@ -1,21 +0,0 @@ -package org.apache.spark.mllib.feature - -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.util.LocalSparkContext -import org.scalatest.FunSuite - -class FeatureSelectionSuite extends FunSuite with LocalSparkContext{ - - test("FeatureSelection test") { - val labeledData = sc.parallelize( - Seq( new LabeledPoint(0.0, Vectors.sparse(3, Seq((0, 8.8),(1, 9.9)))), - new LabeledPoint(1.0, Vectors.sparse(3, Seq((0, 1.1),(2, 3.3)))), - new LabeledPoint(1.0, Vectors.sparse(3, Seq((1, 2.2),(2, 4.4)))) - ), 2) - labeledData.foreach(println) - val filteredData = new ChiSquared(labeledData).filter - filteredData.foreach(println) - } - -} From 2bacdc768fe3c4ca429c4ad24070cd34a3c1fc5f Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Mon, 14 Jul 2014 20:52:28 +0400 Subject: [PATCH 07/16] Combinations and chi-squared values test --- .../spark/mllib/feature/ChiSquared.scala | 24 +++-- .../spark/mllib/feature/ChiSquaredSuite.scala | 88 ++++++++++++++++--- 2 files changed, 89 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 090cb13acf471..9c5a8fa32f76f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -19,10 +19,10 @@ private[feature] trait CombinationsCalculator extends java.io.Serializable { protected def indexByLabelMap(labeledData: RDD[LabeledPoint]) = { labeledData.map(labeledPoint => - labeledPoint.label).distinct.collect.zipWithIndex.toMap + labeledPoint.label).distinct.collect.sorted.zipWithIndex.toMap } - protected def featureLabelCombinations(labeledData: RDD[LabeledPoint]) = { + protected def featureLabelCombinations(labeledData: RDD[LabeledPoint]): RDD[(Int, Array[(Int, Int)])] = { val indexByLabel = indexByLabelMap(labeledData) labeledData.flatMap { labeledPoint => @@ -43,21 +43,18 @@ private[feature] trait CombinationsCalculator extends java.io.Serializable { } class ChiSquared(labeledData: RDD[LabeledPoint]) -extends java.io.Serializable with CombinationsCalculator -with LabeledPointFeatureFilter with FeatureSort { + extends java.io.Serializable with CombinationsCalculator + with LabeledPointFeatureFilter with FeatureSort { override def data: RDD[LabeledPoint] = labeledData - override def select: Set[Int] = { - - println(chi2Data.first()) - top(chi2Data, 1) + top(chiSquaredValues, 1) } - val labelsByIndex = indexByLabelMap(labeledData).map(_.swap) - val combinations = featureLabelCombinations(labeledData) + private val labelsByIndex = indexByLabelMap(labeledData).map(_.swap) + private val combinations = featureLabelCombinations(labeledData) - val chi2Data: RDD[((Int, Double), Double)] = combinations.flatMap { + lazy val chiSquaredValues: RDD[((Int, Double), Double)] = combinations.flatMap { case (featureIndex, counts) => val (featureClassCounts, notFeatureClassCounts) = counts.unzip val featureCount = featureClassCounts.sum @@ -70,8 +67,9 @@ with LabeledPointFeatureFilter with FeatureSort { val n10 = featureNotClassCounts(labelIndex) val n01 = notFeatureClassCounts(labelIndex) val n00 = notFeatureNotClassCounts(labelIndex) - val chi2 = (n11 + n10 + n01 + n00) * sqr(n11 * n00 - n10 * n01).toDouble / - ((n11 + n01) * (n11 + n10) * (n10 + n00) * (n01 + n00)) + val numerator = (n11 + n10 + n01 + n00) * sqr(n11 * n00 - n10 * n01) + val denominator = (n11 + n01) * (n11 + n10) * (n10 + n00) * (n01 + n00) + val chi2 = if (numerator == 0) 0 else numerator.toDouble / denominator ((featureIndex, labelsByIndex(labelIndex)), chi2)} } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala index 5d8596d6f8759..43ea7c0a3c14b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -1,26 +1,94 @@ package org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{SparseVector, Vectors} +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.mllib.util.LocalSparkContext + import org.scalatest.FunSuite class ChiSquaredSuite extends FunSuite with LocalSparkContext { - test("Chi Squared feature selection test") { - val labeledData = sc.parallelize( - Seq( new LabeledPoint(0.0, Vectors.sparse(3, Seq((0, 8.8),(1, 9.9)))), - new LabeledPoint(1.0, Vectors.sparse(3, Seq((0, 1.1),(2, 3.3)))), - new LabeledPoint(1.0, Vectors.sparse(3, Seq((1, 2.2),(2, 4.4)))) + def labeledData = sc.parallelize( + Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 9.0, 0.0))), + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 5.0, 6.0))), + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 7.0, 8.0))), + new LabeledPoint(2.0, Vectors.dense(Array(8.0, 4.0, 5.0))) + ), 2) + + /* + * feature# 0 1 2 + * presence: Y/N Y/N Y/N + * class_0|1|0||1|0||0|1 + * class_1|0|2||2|0||2|0 + * class_2|1|0||1|0||1|0 + * + * class# 0 1 2 + * presence: Y/N Y/N Y/N + * feature_0||1|1||0|2||1|1| + * ^feature_0||0|2||2|0||0|2| + * feature_1||1|2||2|2||1|3| + * ^feature_1||0|0||0|0||0|0| + * feature_2||0|3||2|1||1|2| + * ^feature_2||1|0||0|1||0|1| + * + */ + test("Feature class combinations test") { + val indexByLabelMap = Map((0.0 -> 0), (1.0 -> 1), (2.0 -> 2)) + val featureLabelCombinations = sc.parallelize( + Seq( (0 -> Array((1, 0), (0, 2), (1, 0))), + (1 -> Array((1, 0), (2, 0), (1, 0))), + (2 -> Array((0, 1), (2, 0), (1, 0))) ), 2) - labeledData.foreach(println) + class CombinationsCalculatorTest(labeledData: RDD[LabeledPoint]) extends CombinationsCalculator { + def combinations = featureLabelCombinations(labeledData) + def index = indexByLabelMap(labeledData) + } + val combinationsTest = new CombinationsCalculatorTest(labeledData) + assert(indexByLabelMap == combinationsTest.index) + val flc = featureLabelCombinations.collectAsMap() + assert(combinationsTest.combinations.collectAsMap().forall { + case (key, value) => flc(key).deep.sameElements(value) + }) + } + private def sqr(x: Int) = x * x + + test("Chi Squared feature selection test") { + val chi2ValuesByClass = sc.parallelize( + Seq( ((0, 0.0), + (1 + 1 + 0 + 2) * sqr(1 * 2 - 1 * 0).toDouble / + ((1 + 0) * (1 + 1) * (1 + 2) * (0 + 2))), + ((0, 1.0), + (0 + 2 + 2 + 0) * sqr(0 * 0 - 2 * 2).toDouble / + ((0 + 2) * (0 + 2) * (0 + 2) * (0 + 2))), + ((0, 2.0), + (1 + 1 + 0 + 2) * sqr(1 * 2 - 1 * 0).toDouble / + ((1 + 0) * (1 + 1) * (2 + 0) * (2 + 1))), + ((1, 0.0), + (1 + 2 + 0 + 0) * sqr(1 * 0 - 2 * 0).toDouble), + ((1, 1.0), + (2 + 2 + 0 + 0) * sqr(2 * 0 - 2 * 0).toDouble), + ((1, 2.0), + (1 + 3 + 0 + 0) * sqr(1 * 0 - 3 * 0).toDouble), + ((2, 0.0), + (0 + 3 + 1 + 0) * sqr(0 * 0 - 3 * 1).toDouble / + ((0 + 1) * (0 + 3) * (0 + 1) * (0 + 3))), + ((2, 1.0), + (2 + 1 + 1 + 0) * sqr(2 * 1 - 1 * 0).toDouble / + ((2 + 0) * (2 + 1) * (1 + 0) * (1 + 1))), + ((2, 2.0), + (1 + 2 + 1 + 0) * sqr(1 * 1 - 2 * 0).toDouble / + ((1 + 0) * (1 + 2) * (1 + 0) * (1 + 2))) + ), 2) val chiSquared = new ChiSquared(labeledData) - chiSquared.chi2Data.foreach{println} - val filteredData = chiSquared.filter - filteredData.foreach(println) + val c2vbc = chi2ValuesByClass.collectAsMap() + assert(chiSquared.chiSquaredValues.collectAsMap().forall { + case (key, value) => c2vbc(key) == value + }) + } From f356365643d53076572df380912508fa9b661c87 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Thu, 17 Jul 2014 19:20:31 +0400 Subject: [PATCH 08/16] Chi Squared by contingency table. Refactoring --- .../spark/mllib/feature/ChiSquared.scala | 101 +++++++++--------- .../spark/mllib/feature/ChiSquaredSuite.scala | 62 ++--------- 2 files changed, 58 insertions(+), 105 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 9c5a8fa32f76f..e7bc6fa24472f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -4,74 +4,75 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -private[feature] trait FeatureSort extends java.io.Serializable { - /** methods for feature filtering based on statistics */ - protected def top(featureClassValues: RDD[((Int, Double), Double)], n: Int): Set[Int] = { - println(featureClassValues.first()) - val (featureIndexes, _) = featureClassValues.map { - case ((featureIndex, label), value) => (featureIndex, value) - }.reduceByKey(Math.max(_, _)).collect().sortBy(- _._2).unzip - featureIndexes.take(n).toSet - } -} +private[feature] trait ContingencyTableCalculator extends java.io.Serializable { -private[feature] trait CombinationsCalculator extends java.io.Serializable { - - protected def indexByLabelMap(labeledData: RDD[LabeledPoint]) = { - labeledData.map(labeledPoint => + private def indexByLabelMap(discreteData: RDD[LabeledPoint]) = { + discreteData.map(labeledPoint => labeledPoint.label).distinct.collect.sorted.zipWithIndex.toMap } - protected def featureLabelCombinations(labeledData: RDD[LabeledPoint]): RDD[(Int, Array[(Int, Int)])] = { - val indexByLabel = indexByLabelMap(labeledData) - labeledData.flatMap { + private def enumerateValues(discreteData: RDD[LabeledPoint]) = { + discreteData.flatMap(labeledPoint => + (0 to labeledPoint.features.size).zip(labeledPoint.features.toArray)) + .distinct() + .combineByKey[List[Double]]( + createCombiner = (value: Double) => List(value), + mergeValue = (c: List[Double], value: Double) => value :: c, + mergeCombiners = (c1: List[Double], c2: List[Double]) => c1 ::: c2 + ).collectAsMap() + } + + def tables(discreteData: RDD[LabeledPoint]): RDD[(Int, Array[Array[Int]])] = { + val indexByLabel = indexByLabelMap(discreteData) + val classesCount = indexByLabel.size + val valuesByIndex = enumerateValues(discreteData) + discreteData.flatMap { labeledPoint => labeledPoint.features.toArray.zipWithIndex.map { case (featureValue, featureIndex) => /** array of feature presence/absence in a class */ - val counts = Array.fill[(Int, Int)](indexByLabel.size)(0, 0) - val label = labeledPoint.label - counts(indexByLabel(label)) = if(featureValue != 0) (1, 0) else (0, 1) + val featureValues = valuesByIndex(featureIndex) + val valuesCount = featureValues.size + val featureValueIndex = featureValues.indexOf(featureValue) + val labelIndex = indexByLabel(labeledPoint.label) + val counts = Array.ofDim[Int](valuesCount, classesCount) + counts(featureValueIndex)(labelIndex) = 1 (featureIndex, counts) } }.reduceByKey { case (x, y) => - x.zip(y).map { case ((a1, b1), (a2, b2)) => - (a1 + a2, b1 + b2)} + x.zip(y).map { case(row1, row2) => row1.zip(row2).map{ case(a, b) => a + b} } } } } -class ChiSquared(labeledData: RDD[LabeledPoint]) - extends java.io.Serializable with CombinationsCalculator - with LabeledPointFeatureFilter with FeatureSort { +private[feature] trait ChiSquared { - override def data: RDD[LabeledPoint] = labeledData - override def select: Set[Int] = { - top(chiSquaredValues, 1) + def compute(contingencyTable: Array[Array[Int]]): Double = { + /** probably use List instead of Array */ + val columns = contingencyTable(0).size + val rowSums = contingencyTable.map( row => row.sum) + val columnSums = contingencyTable.fold(Array.ofDim[Int](columns)){ case (ri, rj) => + ri.zip(rj).map { case (a1, b1) + => a1 + b1}} + val tableSum = rowSums.sum + val rowRatios = rowSums.map( _.toDouble / tableSum) + val expectedTable = rowRatios.map( a => columnSums.map(_ * a)) + val chi2 = contingencyTable.zip(expectedTable).foldLeft(0.0){ case(sum, (obsRow, expRow)) => + obsRow.zip(expRow).map{ case (oValue, eValue) => + sqr(oValue - eValue) / eValue}.sum + sum} + chi2 } - private val labelsByIndex = indexByLabelMap(labeledData).map(_.swap) - private val combinations = featureLabelCombinations(labeledData) + private def sqr(x: Double): Double = x * x +} - lazy val chiSquaredValues: RDD[((Int, Double), Double)] = combinations.flatMap { - case (featureIndex, counts) => - val (featureClassCounts, notFeatureClassCounts) = counts.unzip - val featureCount = featureClassCounts.sum - val notFeatureCount = notFeatureClassCounts.sum - val notFeatureNotClassCounts = notFeatureClassCounts.map(notFeatureCount - _) - val featureNotClassCounts = featureClassCounts.map(featureCount - _) - val iCounts = counts.zipWithIndex - iCounts.map { case ((a, b), labelIndex) => - val n11 = featureClassCounts(labelIndex) - val n10 = featureNotClassCounts(labelIndex) - val n01 = notFeatureClassCounts(labelIndex) - val n00 = notFeatureNotClassCounts(labelIndex) - val numerator = (n11 + n10 + n01 + n00) * sqr(n11 * n00 - n10 * n01) - val denominator = (n11 + n01) * (n11 + n10) * (n10 + n00) * (n01 + n00) - val chi2 = if (numerator == 0) 0 else numerator.toDouble / denominator - ((featureIndex, labelsByIndex(labelIndex)), chi2)} - } +class ChiSquaredFeatureSelection(labeledData: RDD[LabeledPoint], numTopFeatures: Int) extends java.io.Serializable +with LabeledPointFeatureFilter with ContingencyTableCalculator with ChiSquared { + override def data: RDD[LabeledPoint] = labeledData - private def sqr(x: Int): Int = x * x -} + override def select: Set[Int] = { + tables(data).map { case (featureIndex, contTable) => + (featureIndex, compute(contTable))}.collect().sortBy(-_._2).take(numTopFeatures).unzip._1.toSet + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala index 43ea7c0a3c14b..68a1f7108069e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -12,10 +12,10 @@ import org.scalatest.FunSuite class ChiSquaredSuite extends FunSuite with LocalSparkContext { def labeledData = sc.parallelize( - Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 9.0, 0.0))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 5.0, 6.0))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 7.0, 8.0))), - new LabeledPoint(2.0, Vectors.dense(Array(8.0, 4.0, 5.0))) + Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 7.0, 0.0))), + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 6.0))), + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), + new LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0))) ), 2) /* @@ -35,59 +35,11 @@ class ChiSquaredSuite extends FunSuite with LocalSparkContext { * ^feature_2||1|0||0|1||0|1| * */ - test("Feature class combinations test") { - val indexByLabelMap = Map((0.0 -> 0), (1.0 -> 1), (2.0 -> 2)) - val featureLabelCombinations = sc.parallelize( - Seq( (0 -> Array((1, 0), (0, 2), (1, 0))), - (1 -> Array((1, 0), (2, 0), (1, 0))), - (2 -> Array((0, 1), (2, 0), (1, 0))) - ), 2) - class CombinationsCalculatorTest(labeledData: RDD[LabeledPoint]) extends CombinationsCalculator { - def combinations = featureLabelCombinations(labeledData) - def index = indexByLabelMap(labeledData) - } - val combinationsTest = new CombinationsCalculatorTest(labeledData) - assert(indexByLabelMap == combinationsTest.index) - val flc = featureLabelCombinations.collectAsMap() - assert(combinationsTest.combinations.collectAsMap().forall { - case (key, value) => flc(key).deep.sameElements(value) - }) - } - private def sqr(x: Int) = x * x + test("Chi Squared test") { - test("Chi Squared feature selection test") { - val chi2ValuesByClass = sc.parallelize( - Seq( ((0, 0.0), - (1 + 1 + 0 + 2) * sqr(1 * 2 - 1 * 0).toDouble / - ((1 + 0) * (1 + 1) * (1 + 2) * (0 + 2))), - ((0, 1.0), - (0 + 2 + 2 + 0) * sqr(0 * 0 - 2 * 2).toDouble / - ((0 + 2) * (0 + 2) * (0 + 2) * (0 + 2))), - ((0, 2.0), - (1 + 1 + 0 + 2) * sqr(1 * 2 - 1 * 0).toDouble / - ((1 + 0) * (1 + 1) * (2 + 0) * (2 + 1))), - ((1, 0.0), - (1 + 2 + 0 + 0) * sqr(1 * 0 - 2 * 0).toDouble), - ((1, 1.0), - (2 + 2 + 0 + 0) * sqr(2 * 0 - 2 * 0).toDouble), - ((1, 2.0), - (1 + 3 + 0 + 0) * sqr(1 * 0 - 3 * 0).toDouble), - ((2, 0.0), - (0 + 3 + 1 + 0) * sqr(0 * 0 - 3 * 1).toDouble / - ((0 + 1) * (0 + 3) * (0 + 1) * (0 + 3))), - ((2, 1.0), - (2 + 1 + 1 + 0) * sqr(2 * 1 - 1 * 0).toDouble / - ((2 + 0) * (2 + 1) * (1 + 0) * (1 + 1))), - ((2, 2.0), - (1 + 2 + 1 + 0) * sqr(1 * 1 - 2 * 0).toDouble / - ((1 + 0) * (1 + 2) * (1 + 0) * (1 + 2))) - ), 2) - val chiSquared = new ChiSquared(labeledData) - val c2vbc = chi2ValuesByClass.collectAsMap() - assert(chiSquared.chiSquaredValues.collectAsMap().forall { - case (key, value) => c2vbc(key) == value - }) + val chi = new ChiSquaredFeatureSelection(labeledData, 2) + chi.filter.foreach(println) } From 150a3e0c552a4086edb71e512e6fa30893c8eea7 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 18 Jul 2014 13:41:05 +0400 Subject: [PATCH 09/16] Scala style fix --- .../spark/mllib/feature/ChiSquared.scala | 41 ++++++++++------- .../spark/mllib/feature/ChiSquaredSuite.scala | 44 +++++++++++++------ 2 files changed, 54 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index e7bc6fa24472f..15563be693285 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -22,7 +22,7 @@ private[feature] trait ContingencyTableCalculator extends java.io.Serializable { ).collectAsMap() } - def tables(discreteData: RDD[LabeledPoint]): RDD[(Int, Array[Array[Int]])] = { + protected def tables(discreteData: RDD[LabeledPoint]): RDD[(Int, Array[Array[Int]])] = { val indexByLabel = indexByLabelMap(discreteData) val classesCount = indexByLabel.size val valuesByIndex = enumerateValues(discreteData) @@ -30,7 +30,7 @@ private[feature] trait ContingencyTableCalculator extends java.io.Serializable { labeledPoint => labeledPoint.features.toArray.zipWithIndex.map { case (featureValue, featureIndex) => - /** array of feature presence/absence in a class */ + /** array of feature value presence in a class */ val featureValues = valuesByIndex(featureIndex) val valuesCount = featureValues.size val featureValueIndex = featureValues.indexOf(featureValue) @@ -41,38 +41,45 @@ private[feature] trait ContingencyTableCalculator extends java.io.Serializable { } }.reduceByKey { case (x, y) => - x.zip(y).map { case(row1, row2) => row1.zip(row2).map{ case(a, b) => a + b} } + x.zip(y).map { case (row1, row2) => + row1.zip(row2).map { case (a, b) => + a + b}} } } } -private[feature] trait ChiSquared { +private[feature] object ChiSquared { - def compute(contingencyTable: Array[Array[Int]]): Double = { + def apply(contingencyTable: Array[Array[Int]]): Double = { /** probably use List instead of Array */ val columns = contingencyTable(0).size - val rowSums = contingencyTable.map( row => row.sum) - val columnSums = contingencyTable.fold(Array.ofDim[Int](columns)){ case (ri, rj) => - ri.zip(rj).map { case (a1, b1) - => a1 + b1}} + val rowSums = contingencyTable.map(row => + row.sum) + val columnSums = contingencyTable.fold(Array.ofDim[Int](columns)) { case (ri, rj) => + ri.zip(rj).map { case (a1, b1) => + a1 + b1 + } + } val tableSum = rowSums.sum - val rowRatios = rowSums.map( _.toDouble / tableSum) - val expectedTable = rowRatios.map( a => columnSums.map(_ * a)) - val chi2 = contingencyTable.zip(expectedTable).foldLeft(0.0){ case(sum, (obsRow, expRow)) => - obsRow.zip(expRow).map{ case (oValue, eValue) => - sqr(oValue - eValue) / eValue}.sum + sum} - chi2 + val rowRatios = rowSums.map(_.toDouble / tableSum) + val expectedTable = rowRatios.map(a => columnSums.map(_ * a)) + contingencyTable.zip(expectedTable).foldLeft(0.0) { case (sum, (obsRow, expRow)) => + obsRow.zip(expRow).map { case (oValue, eValue) => + sqr(oValue - eValue) / eValue + }.sum + sum + } } private def sqr(x: Double): Double = x * x } class ChiSquaredFeatureSelection(labeledData: RDD[LabeledPoint], numTopFeatures: Int) extends java.io.Serializable -with LabeledPointFeatureFilter with ContingencyTableCalculator with ChiSquared { +with LabeledPointFeatureFilter with ContingencyTableCalculator { override def data: RDD[LabeledPoint] = labeledData override def select: Set[Int] = { tables(data).map { case (featureIndex, contTable) => - (featureIndex, compute(contTable))}.collect().sortBy(-_._2).take(numTopFeatures).unzip._1.toSet + (featureIndex, ChiSquared(contTable)) + }.collect().sortBy(-_._2).take(numTopFeatures).unzip._1.toSet } } \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala index 68a1f7108069e..09f374a0c68a9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -4,11 +4,17 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.{MLUtils, LocalSparkContext} import org.scalatest.FunSuite +class ChiTest(data: RDD[LabeledPoint]) extends java.io.Serializable +with ContingencyTableCalculator { + val chi = tables(data).map { case ( fIndex, table) + => (fIndex, ChiSquared(table)) }.collect().sortBy(-_._2) +} + class ChiSquaredSuite extends FunSuite with LocalSparkContext { def labeledData = sc.parallelize( @@ -19,20 +25,23 @@ class ChiSquaredSuite extends FunSuite with LocalSparkContext { ), 2) /* - * feature# 0 1 2 - * presence: Y/N Y/N Y/N - * class_0|1|0||1|0||0|1 - * class_1|0|2||2|0||2|0 - * class_2|1|0||1|0||1|0 + * Contingency tables + * feature0 = {8.0, 0.0} + * class 0 1 2 + * 8.0||1|0|1| + * 0.0||0|2|0| + * + * feature1 = {7.0, 9.0} + * class 0 1 2 + * 7.0||1|0|0| + * 9.0||0|2|1| * - * class# 0 1 2 - * presence: Y/N Y/N Y/N - * feature_0||1|1||0|2||1|1| - * ^feature_0||0|2||2|0||0|2| - * feature_1||1|2||2|2||1|3| - * ^feature_1||0|0||0|0||0|0| - * feature_2||0|3||2|1||1|2| - * ^feature_2||1|0||0|1||0|1| + * feature2 = {0.0, 6.0, 8.0, 5.0} + * class 0 1 2 + * 0.0||1|0|0| + * 6.0||0|1|0| + * 8.0||0|1|0| + * 5.0||0|0|1| * */ @@ -44,4 +53,11 @@ class ChiSquaredSuite extends FunSuite with LocalSparkContext { } + test("Big test") { + val dData = MLUtils.loadLibSVMFile(sc, "c:/ulanov/res/indigo/data-test-spark.libsvm", true) + val chiTest = new ChiTest(dData) + chiTest.chi.foreach(println) + } + + } From 80363ca24104fb5a6c0ea57d5bc03aa469cdc865 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 18 Jul 2014 15:10:48 +0400 Subject: [PATCH 10/16] Tests, comments, apache headers and scala style --- .../spark/mllib/feature/ChiSquared.scala | 70 +++++++++++++++++-- .../mllib/feature/FeatureSelection.scala | 60 ++++++++++++++++ .../spark/mllib/feature/ChiSquaredSuite.scala | 50 +++++++------ 3 files changed, 151 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala index 15563be693285..9ebd1e40bb447 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala @@ -1,16 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.feature import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +/** + * :: Experimental :: + * Contingency feature-class table calculator for discrete features + */ +@Experimental private[feature] trait ContingencyTableCalculator extends java.io.Serializable { + /** + * Returns the map of label to index + * @param discreteData labeled discrete data + */ private def indexByLabelMap(discreteData: RDD[LabeledPoint]) = { discreteData.map(labeledPoint => labeledPoint.label).distinct.collect.sorted.zipWithIndex.toMap } + /** + * Returns a map of feature index to the list of its values + * @param discreteData labeled discrete data + */ private def enumerateValues(discreteData: RDD[LabeledPoint]) = { discreteData.flatMap(labeledPoint => (0 to labeledPoint.features.size).zip(labeledPoint.features.toArray)) @@ -22,6 +53,10 @@ private[feature] trait ContingencyTableCalculator extends java.io.Serializable { ).collectAsMap() } + /** + * Returns an RDD of feature (index, contingency table) pairs + * @param discreteData labeled discrete data + */ protected def tables(discreteData: RDD[LabeledPoint]): RDD[(Int, Array[Array[Int]])] = { val indexByLabel = indexByLabelMap(discreteData) val classesCount = indexByLabel.size @@ -48,10 +83,19 @@ private[feature] trait ContingencyTableCalculator extends java.io.Serializable { } } +/** + * :: Experimental :: + * Calculates chi-squared value based on contingency table + */ +@Experimental private[feature] object ChiSquared { + /** + * Returns chi-squared value for a given contingency table + * @param contingencyTable contingency table + */ def apply(contingencyTable: Array[Array[Int]]): Double = { - /** probably use List instead of Array */ + /** probably use List instead of Array or breeze matrices */ val columns = contingencyTable(0).size val rowSums = contingencyTable.map(row => row.sum) @@ -62,7 +106,8 @@ private[feature] object ChiSquared { } val tableSum = rowSums.sum val rowRatios = rowSums.map(_.toDouble / tableSum) - val expectedTable = rowRatios.map(a => columnSums.map(_ * a)) + val expectedTable = rowRatios.map(a => + columnSums.map(_ * a)) contingencyTable.zip(expectedTable).foldLeft(0.0) { case (sum, (obsRow, expRow)) => obsRow.zip(expRow).map { case (oValue, eValue) => sqr(oValue - eValue) / eValue @@ -70,16 +115,29 @@ private[feature] object ChiSquared { } } + /** + * Returns squared value + * @param x value + */ private def sqr(x: Double): Double = x * x } -class ChiSquaredFeatureSelection(labeledData: RDD[LabeledPoint], numTopFeatures: Int) extends java.io.Serializable -with LabeledPointFeatureFilter with ContingencyTableCalculator { - override def data: RDD[LabeledPoint] = labeledData +/** + * :: Experimental :: + * Performes chi-square feature selection for a given discrete dataset. + * Filters a given number of features sorted by chi-squared descending + * @param labeledDiscreteData data + * @param numTopFeatures top N features sorted by chi-square to keep + */ +@Experimental +class ChiSquaredFeatureSelection(labeledDiscreteData: RDD[LabeledPoint], numTopFeatures: Int) + extends java.io.Serializable with LabeledPointFeatureFilter with ContingencyTableCalculator { + + override def data: RDD[LabeledPoint] = labeledDiscreteData override def select: Set[Int] = { tables(data).map { case (featureIndex, contTable) => (featureIndex, ChiSquared(contTable)) }.collect().sortBy(-_._2).take(numTopFeatures).unzip._1.toSet } -} \ No newline at end of file +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala index 4feccc1816eb0..6046c42d9fce3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala @@ -1,26 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.feature +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +/** + * :: Experimental :: + * Generic trait for feature selection + */ +@Experimental trait FeatureSelection[T] extends java.io.Serializable { + + /** + * Returns data that will be process by feature selection + */ def data: RDD[T] + + /** + * Returns the set of feature indexes given the feature selection criteria + */ def select: Set[Int] } +/** + * :: Experimental :: + * Generic trait for feature selection and filter + */ +@Experimental sealed trait FeatureFilter[T] extends FeatureSelection[T] { + + /** + * Returns a dataset with features filtered + */ def filter: RDD[T] } +/** + * :: Experimental :: + * Feature selection and filter trait that processes LabeledPoints + */ +@Experimental trait LabeledPointFeatureFilter extends FeatureFilter[LabeledPoint] { + + /** + * Returns a dataset with features filtered + */ def filter: RDD[LabeledPoint] = { val indices = select data.map { lp => new LabeledPoint(lp.label, Compress(lp.features, indices)) } } } +/** + * :: Experimental :: + * Filters features in a given vector + */ +@Experimental object Compress { + + /** + * Returns a vector with features filtered + * @param features vector + * @param indexes indexes of features to filter + */ def apply(features: Vector, indexes: Set[Int]): Vector = { val (values, _) = features.toArray.zipWithIndex.filter { case (value, index) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala index 09f374a0c68a9..c1f865e9f79f9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -1,27 +1,26 @@ package org.apache.spark.mllib.feature -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.util.{MLUtils, LocalSparkContext} +import org.apache.spark.mllib.util.LocalSparkContext import org.scalatest.FunSuite -class ChiTest(data: RDD[LabeledPoint]) extends java.io.Serializable -with ContingencyTableCalculator { - val chi = tables(data).map { case ( fIndex, table) - => (fIndex, ChiSquared(table)) }.collect().sortBy(-_._2) -} - class ChiSquaredSuite extends FunSuite with LocalSparkContext { - def labeledData = sc.parallelize( + private class ChiTest(data: RDD[LabeledPoint]) + extends java.io.Serializable with ContingencyTableCalculator { + val chi = tables(data).map { case ( fIndex, table) => + (fIndex, ChiSquared(table)) }.collect() + } + + lazy val labeledDiscreteData = sc.parallelize( Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 7.0, 0.0))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 6.0))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), - new LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0))) + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 6.0))), + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), + new LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0))) ), 2) /* @@ -43,21 +42,26 @@ class ChiSquaredSuite extends FunSuite with LocalSparkContext { * 8.0||0|1|0| * 5.0||0|0|1| * + * Use chi-squared calculator from Internet */ - test("Chi Squared test") { - - val chi = new ChiSquaredFeatureSelection(labeledData, 2) - chi.filter.foreach(println) + test("Chi Squared values and contingency tables test") { + val preComputedChi2 = Map( (0 -> 4.0), (1 -> 4.0), (2 -> 8.0)) + val computedChi2 = new ChiTest(labeledDiscreteData).chi + val delta = 0.000001 + assert(computedChi2.forall{ case (featureIndex, chi2) => + (preComputedChi2(featureIndex) - chi2) <= delta}) } - - test("Big test") { - val dData = MLUtils.loadLibSVMFile(sc, "c:/ulanov/res/indigo/data-test-spark.libsvm", true) - val chiTest = new ChiTest(dData) - chiTest.chi.foreach(println) + test("Chi Squared feature selection test") { + val preFilteredData = + Set( new LabeledPoint(0.0, Vectors.dense(Array(0.0))), + new LabeledPoint(1.0, Vectors.dense(Array(6.0))), + new LabeledPoint(1.0, Vectors.dense(Array(8.0))), + new LabeledPoint(2.0, Vectors.dense(Array(5.0))) + ) + val filteredData = new ChiSquaredFeatureSelection(labeledDiscreteData, 1).filter.collect.toSet + assert(filteredData == preFilteredData) } - - } From f9b070ad1c442e6331ce13c08e05fe5a3c50936a Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 18 Jul 2014 15:40:19 +0400 Subject: [PATCH 11/16] Adding Apache header in tests... --- .../spark/mllib/feature/ChiSquaredSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala index c1f865e9f79f9..d1d30caf400bd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.feature import org.apache.spark.mllib.linalg.Vectors From 427ca4eedffd21b4e62e8de55c92a6b3fea45240 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Thu, 13 Nov 2014 14:02:31 -0800 Subject: [PATCH 12/16] Addressing reviewers comments: implement VectorTransformer interface, use Statistics.chiSqTest --- ...ureSelection.scala => ChiSqSelector.scala} | 57 ++++--- .../spark/mllib/feature/ChiSquared.scala | 143 ------------------ ...edSuite.scala => ChiSqSelectorSuite.scala} | 26 +--- 3 files changed, 31 insertions(+), 195 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/feature/{FeatureSelection.scala => ChiSqSelector.scala} (59%) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala rename mllib/src/test/scala/org/apache/spark/mllib/feature/{ChiSquaredSuite.scala => ChiSqSelectorSuite.scala} (72%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala similarity index 59% rename from mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala rename to mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 6046c42d9fce3..75917cf38e655 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/FeatureSelection.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -18,54 +18,49 @@ package org.apache.spark.mllib.feature import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD /** * :: Experimental :: - * Generic trait for feature selection - */ -@Experimental -trait FeatureSelection[T] extends java.io.Serializable { - - /** - * Returns data that will be process by feature selection - */ - def data: RDD[T] - - /** - * Returns the set of feature indexes given the feature selection criteria - */ - def select: Set[Int] -} - -/** - * :: Experimental :: - * Generic trait for feature selection and filter + * Chi Squared selector model. + * + * @param indices list of indices to select (filter) */ @Experimental -sealed trait FeatureFilter[T] extends FeatureSelection[T] { - +class ChiSqSelectorModel(indices: IndexedSeq[Int]) extends VectorTransformer { /** - * Returns a dataset with features filtered + * Applies transformation on a vector. + * + * @param vector vector to be transformed. + * @return transformed vector. */ - def filter: RDD[T] + override def transform(vector: linalg.Vector): linalg.Vector = { + Compress(vector, indices) + } } /** * :: Experimental :: - * Feature selection and filter trait that processes LabeledPoints + * Creates a ChiSquared feature selector. */ @Experimental -trait LabeledPointFeatureFilter extends FeatureFilter[LabeledPoint] { +object ChiSqSelector { /** - * Returns a dataset with features filtered + * Returns a ChiSquared feature selector. + * + * @param data data used to compute the Chi Squared statistic. + * @param numTopFeatures number of features that selector will select + * (ordered by statistic value descending) */ - def filter: RDD[LabeledPoint] = { - val indices = select - data.map { lp => new LabeledPoint(lp.label, Compress(lp.features, indices)) } + def fit(data: RDD[LabeledPoint], numTopFeatures: Int): ChiSqSelectorModel = { + val (_, indices) = Statistics.chiSqTest(data).zipWithIndex.sortBy{ case(res, index) => + -res.statistic}.take(numTopFeatures).unzip + new ChiSqSelectorModel(indices) } } @@ -75,13 +70,12 @@ trait LabeledPointFeatureFilter extends FeatureFilter[LabeledPoint] { */ @Experimental object Compress { - /** * Returns a vector with features filtered * @param features vector * @param indexes indexes of features to filter */ - def apply(features: Vector, indexes: Set[Int]): Vector = { + def apply(features: Vector, indexes: IndexedSeq[Int]): Vector = { val (values, _) = features.toArray.zipWithIndex.filter { case (value, index) => indexes.contains(index)}.unzip @@ -89,3 +83,4 @@ object Compress { Vectors.dense(values.toArray) } } + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala deleted file mode 100644 index 9ebd1e40bb447..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSquared.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.feature - -import org.apache.spark.SparkContext._ -import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD - -/** - * :: Experimental :: - * Contingency feature-class table calculator for discrete features - */ -@Experimental -private[feature] trait ContingencyTableCalculator extends java.io.Serializable { - - /** - * Returns the map of label to index - * @param discreteData labeled discrete data - */ - private def indexByLabelMap(discreteData: RDD[LabeledPoint]) = { - discreteData.map(labeledPoint => - labeledPoint.label).distinct.collect.sorted.zipWithIndex.toMap - } - - /** - * Returns a map of feature index to the list of its values - * @param discreteData labeled discrete data - */ - private def enumerateValues(discreteData: RDD[LabeledPoint]) = { - discreteData.flatMap(labeledPoint => - (0 to labeledPoint.features.size).zip(labeledPoint.features.toArray)) - .distinct() - .combineByKey[List[Double]]( - createCombiner = (value: Double) => List(value), - mergeValue = (c: List[Double], value: Double) => value :: c, - mergeCombiners = (c1: List[Double], c2: List[Double]) => c1 ::: c2 - ).collectAsMap() - } - - /** - * Returns an RDD of feature (index, contingency table) pairs - * @param discreteData labeled discrete data - */ - protected def tables(discreteData: RDD[LabeledPoint]): RDD[(Int, Array[Array[Int]])] = { - val indexByLabel = indexByLabelMap(discreteData) - val classesCount = indexByLabel.size - val valuesByIndex = enumerateValues(discreteData) - discreteData.flatMap { - labeledPoint => - labeledPoint.features.toArray.zipWithIndex.map { - case (featureValue, featureIndex) => - /** array of feature value presence in a class */ - val featureValues = valuesByIndex(featureIndex) - val valuesCount = featureValues.size - val featureValueIndex = featureValues.indexOf(featureValue) - val labelIndex = indexByLabel(labeledPoint.label) - val counts = Array.ofDim[Int](valuesCount, classesCount) - counts(featureValueIndex)(labelIndex) = 1 - (featureIndex, counts) - } - }.reduceByKey { - case (x, y) => - x.zip(y).map { case (row1, row2) => - row1.zip(row2).map { case (a, b) => - a + b}} - } - } -} - -/** - * :: Experimental :: - * Calculates chi-squared value based on contingency table - */ -@Experimental -private[feature] object ChiSquared { - - /** - * Returns chi-squared value for a given contingency table - * @param contingencyTable contingency table - */ - def apply(contingencyTable: Array[Array[Int]]): Double = { - /** probably use List instead of Array or breeze matrices */ - val columns = contingencyTable(0).size - val rowSums = contingencyTable.map(row => - row.sum) - val columnSums = contingencyTable.fold(Array.ofDim[Int](columns)) { case (ri, rj) => - ri.zip(rj).map { case (a1, b1) => - a1 + b1 - } - } - val tableSum = rowSums.sum - val rowRatios = rowSums.map(_.toDouble / tableSum) - val expectedTable = rowRatios.map(a => - columnSums.map(_ * a)) - contingencyTable.zip(expectedTable).foldLeft(0.0) { case (sum, (obsRow, expRow)) => - obsRow.zip(expRow).map { case (oValue, eValue) => - sqr(oValue - eValue) / eValue - }.sum + sum - } - } - - /** - * Returns squared value - * @param x value - */ - private def sqr(x: Double): Double = x * x -} - -/** - * :: Experimental :: - * Performes chi-square feature selection for a given discrete dataset. - * Filters a given number of features sorted by chi-squared descending - * @param labeledDiscreteData data - * @param numTopFeatures top N features sorted by chi-square to keep - */ -@Experimental -class ChiSquaredFeatureSelection(labeledDiscreteData: RDD[LabeledPoint], numTopFeatures: Int) - extends java.io.Serializable with LabeledPointFeatureFilter with ContingencyTableCalculator { - - override def data: RDD[LabeledPoint] = labeledDiscreteData - - override def select: Set[Int] = { - tables(data).map { case (featureIndex, contTable) => - (featureIndex, ChiSquared(contTable)) - }.collect().sortBy(-_._2).take(numTopFeatures).unzip._1.toSet - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala similarity index 72% rename from mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala index d1d30caf400bd..788ff651f66ec 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSquaredSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala @@ -19,19 +19,10 @@ package org.apache.spark.mllib.feature import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD import org.apache.spark.mllib.util.LocalSparkContext - - import org.scalatest.FunSuite -class ChiSquaredSuite extends FunSuite with LocalSparkContext { - - private class ChiTest(data: RDD[LabeledPoint]) - extends java.io.Serializable with ContingencyTableCalculator { - val chi = tables(data).map { case ( fIndex, table) => - (fIndex, ChiSquared(table)) }.collect() - } +class ChiSqSelectorSuite extends FunSuite with LocalSparkContext { lazy val labeledDiscreteData = sc.parallelize( Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 7.0, 0.0))), @@ -62,23 +53,16 @@ class ChiSquaredSuite extends FunSuite with LocalSparkContext { * Use chi-squared calculator from Internet */ - test("Chi Squared values and contingency tables test") { - val preComputedChi2 = Map( (0 -> 4.0), (1 -> 4.0), (2 -> 8.0)) - val computedChi2 = new ChiTest(labeledDiscreteData).chi - val delta = 0.000001 - assert(computedChi2.forall{ case (featureIndex, chi2) => - (preComputedChi2(featureIndex) - chi2) <= delta}) - - } - - test("Chi Squared feature selection test") { + test("ChiSqSelector transform test") { val preFilteredData = Set( new LabeledPoint(0.0, Vectors.dense(Array(0.0))), new LabeledPoint(1.0, Vectors.dense(Array(6.0))), new LabeledPoint(1.0, Vectors.dense(Array(8.0))), new LabeledPoint(2.0, Vectors.dense(Array(5.0))) ) - val filteredData = new ChiSquaredFeatureSelection(labeledDiscreteData, 1).filter.collect.toSet + val model = ChiSqSelector.fit(labeledDiscreteData, 1) + val filteredData = labeledDiscreteData.map(lp => + new LabeledPoint(lp.label, model.transform(lp.features))).collect().toSet assert(filteredData == preFilteredData) } } From 010acff69f72a123e8b9b69ad5ee317545cb7f3a Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 9 Jan 2015 09:58:34 -0800 Subject: [PATCH 13/16] Rebase --- .../org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala index 788ff651f66ec..90e3e793b8c24 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.mllib.feature import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.MLlibTestSparkContext import org.scalatest.FunSuite -class ChiSqSelectorSuite extends FunSuite with LocalSparkContext { +class ChiSqSelectorSuite extends FunSuite with MLlibTestSparkContext { lazy val labeledDiscreteData = sc.parallelize( Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 7.0, 0.0))), From 714b878037b70ae1d19d2fac72416a8382c77f1e Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 30 Jan 2015 12:39:55 -0800 Subject: [PATCH 14/16] Addressing reviewers comments @mengxr --- .../spark/mllib/feature/ChiSqSelector.scala | 66 ++++++++++++++----- .../mllib/feature/ChiSqSelectorSuite.scala | 27 ++++---- 2 files changed, 62 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 75917cf38e655..e98c142894f5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -18,8 +18,7 @@ package org.apache.spark.mllib.feature import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg -import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD @@ -31,14 +30,14 @@ import org.apache.spark.rdd.RDD * @param indices list of indices to select (filter) */ @Experimental -class ChiSqSelectorModel(indices: IndexedSeq[Int]) extends VectorTransformer { +class ChiSqSelectorModel(indices: Array[Int]) extends VectorTransformer { /** * Applies transformation on a vector. * * @param vector vector to be transformed. * @return transformed vector. */ - override def transform(vector: linalg.Vector): linalg.Vector = { + override def transform(vector: Vector): Vector = { Compress(vector, indices) } } @@ -46,20 +45,22 @@ class ChiSqSelectorModel(indices: IndexedSeq[Int]) extends VectorTransformer { /** * :: Experimental :: * Creates a ChiSquared feature selector. + * @param numTopFeatures number of features that selector will select + * (ordered by statistic value descending) */ @Experimental -object ChiSqSelector { +class ChiSqSelector (val numTopFeatures: Int) { /** * Returns a ChiSquared feature selector. * * @param data data used to compute the Chi Squared statistic. - * @param numTopFeatures number of features that selector will select - * (ordered by statistic value descending) */ - def fit(data: RDD[LabeledPoint], numTopFeatures: Int): ChiSqSelectorModel = { - val (_, indices) = Statistics.chiSqTest(data).zipWithIndex.sortBy{ case(res, index) => - -res.statistic}.take(numTopFeatures).unzip + def fit(data: RDD[LabeledPoint]): ChiSqSelectorModel = { + val indices = Statistics.chiSqTest(data) + .zipWithIndex.sortBy { case(res, _) => -res.statistic } + .take(numTopFeatures) + .map{ case(_, indices) => indices } new ChiSqSelectorModel(indices) } } @@ -71,16 +72,45 @@ object ChiSqSelector { @Experimental object Compress { /** - * Returns a vector with features filtered + * Returns a vector with features filtered. + * Preserves the order of filtered features the same as their indices are stored. * @param features vector - * @param indexes indexes of features to filter + * @param filterIndices indices of features to filter */ - def apply(features: Vector, indexes: IndexedSeq[Int]): Vector = { - val (values, _) = - features.toArray.zipWithIndex.filter { case (value, index) => - indexes.contains(index)}.unzip - /** probably make a sparse vector if it was initially sparse */ - Vectors.dense(values.toArray) + def apply(features: Vector, filterIndices: Array[Int]): Vector = { + features match { + case SparseVector(size, indices, values) => + val filterMap = filterIndices.zipWithIndex.toMap + val newSize = filterIndices.length + var k = 0 + var intersectionSize = 0 + while (k < indices.length) { + if( filterMap.contains(indices(k))) { + intersectionSize += 1 + } + k += 1 + } + val newIndices = new Array[Int](intersectionSize) + val newValues = new Array[Double](intersectionSize) + k = 0 + var m = 0 + while (k < indices.length) { + if( filterMap.contains(indices(k))) { + newIndices(m) = filterMap(indices(k)) + newValues(m) = values(k) + m += 1 + } + k += 1 + } + /** Sparse representation might be ineffective if newIndices is small */ + Vectors.sparse(newSize, newIndices, newValues) + case DenseVector(values) => + val values = features.toArray + Vectors.dense(filterIndices.map(i => values(i))) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala index 90e3e793b8c24..05864aeb9f0f8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala @@ -17,20 +17,14 @@ package org.apache.spark.mllib.feature +import org.scalatest.FunSuite + import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.scalatest.FunSuite class ChiSqSelectorSuite extends FunSuite with MLlibTestSparkContext { - lazy val labeledDiscreteData = sc.parallelize( - Seq( new LabeledPoint(0.0, Vectors.dense(Array(8.0, 7.0, 0.0))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 6.0))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), - new LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0))) - ), 2) - /* * Contingency tables * feature0 = {8.0, 0.0} @@ -53,16 +47,23 @@ class ChiSqSelectorSuite extends FunSuite with MLlibTestSparkContext { * Use chi-squared calculator from Internet */ - test("ChiSqSelector transform test") { + test("ChiSqSelector transform test (sparse & dense vector)") { + val labeledDiscreteData = sc.parallelize( + Seq(new LabeledPoint(0.0, Vectors.sparse(3, Array((0, 8.0), (1, 7.0)))), + new LabeledPoint(1.0, Vectors.sparse(3, Array((1, 9.0), (2, 6.0)))), + new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), + new LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0))) + ), 2) val preFilteredData = - Set( new LabeledPoint(0.0, Vectors.dense(Array(0.0))), + Set(new LabeledPoint(0.0, Vectors.dense(Array(0.0))), new LabeledPoint(1.0, Vectors.dense(Array(6.0))), new LabeledPoint(1.0, Vectors.dense(Array(8.0))), new LabeledPoint(2.0, Vectors.dense(Array(5.0))) ) - val model = ChiSqSelector.fit(labeledDiscreteData, 1) - val filteredData = labeledDiscreteData.map(lp => - new LabeledPoint(lp.label, model.transform(lp.features))).collect().toSet + val model = new ChiSqSelector(1).fit(labeledDiscreteData) + val filteredData = labeledDiscreteData.map { lp => + new LabeledPoint(lp.label, model.transform(lp.features)) + }.collect().toSet assert(filteredData == preFilteredData) } } From a6ad82af0d81cfcba022fcaa1831153a1f2c36d9 Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Fri, 30 Jan 2015 16:36:29 -0800 Subject: [PATCH 15/16] Addressing reviewers comments @mengxr --- .../spark/mllib/feature/ChiSqSelector.scala | 99 +++++++++---------- 1 file changed, 46 insertions(+), 53 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index e98c142894f5d..d1bc3c0422457 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -23,14 +23,16 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD +import scala.collection.mutable.ArrayBuilder + /** * :: Experimental :: * Chi Squared selector model. * - * @param indices list of indices to select (filter) + * @param indices list of indices to select (filter). Must be ordered asc */ @Experimental -class ChiSqSelectorModel(indices: Array[Int]) extends VectorTransformer { +class ChiSqSelectorModel private[mllib] (indices: Array[Int]) extends VectorTransformer { /** * Applies transformation on a vector. * @@ -38,7 +40,47 @@ class ChiSqSelectorModel(indices: Array[Int]) extends VectorTransformer { * @return transformed vector. */ override def transform(vector: Vector): Vector = { - Compress(vector, indices) + compress(vector, indices) + } + + /** + * Returns a vector with features filtered. + * Preserves the order of filtered features the same as their indices are stored. + * Might be moved to Vector as .slice + * @param features vector + * @param filterIndices indices of features to filter, must be ordered asc + */ + private def compress(features: Vector, filterIndices: Array[Int]): Vector = { + features match { + case SparseVector(size, indices, values) => + val newSize = filterIndices.length + val newValues = new ArrayBuilder.ofDouble + val newIndices = new ArrayBuilder.ofInt + var i: Int = 0 + var j: Int = 0 + while(i < indices.length && j < filterIndices.length) { + if(indices(i) == filterIndices(j)) { + newIndices += j + newValues += values(i) + j += 1 + i += 1 + } else { + if(indices(i) > filterIndices(j)) { + j += 1 + } else { + i += 1 + } + } + } + /** Sparse representation might be ineffective if (newSize ~= newValues.size) */ + Vectors.sparse(newSize, newIndices.result(), newValues.result()) + case DenseVector(values) => + val values = features.toArray + Vectors.dense(filterIndices.map(i => values(i))) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } } } @@ -61,56 +103,7 @@ class ChiSqSelector (val numTopFeatures: Int) { .zipWithIndex.sortBy { case(res, _) => -res.statistic } .take(numTopFeatures) .map{ case(_, indices) => indices } + .sorted new ChiSqSelectorModel(indices) } } - -/** - * :: Experimental :: - * Filters features in a given vector - */ -@Experimental -object Compress { - /** - * Returns a vector with features filtered. - * Preserves the order of filtered features the same as their indices are stored. - * @param features vector - * @param filterIndices indices of features to filter - */ - def apply(features: Vector, filterIndices: Array[Int]): Vector = { - features match { - case SparseVector(size, indices, values) => - val filterMap = filterIndices.zipWithIndex.toMap - val newSize = filterIndices.length - var k = 0 - var intersectionSize = 0 - while (k < indices.length) { - if( filterMap.contains(indices(k))) { - intersectionSize += 1 - } - k += 1 - } - val newIndices = new Array[Int](intersectionSize) - val newValues = new Array[Double](intersectionSize) - k = 0 - var m = 0 - while (k < indices.length) { - if( filterMap.contains(indices(k))) { - newIndices(m) = filterMap(indices(k)) - newValues(m) = values(k) - m += 1 - } - k += 1 - } - /** Sparse representation might be ineffective if newIndices is small */ - Vectors.sparse(newSize, newIndices, newValues) - case DenseVector(values) => - val values = features.toArray - Vectors.dense(filterIndices.map(i => values(i))) - case other => - throw new UnsupportedOperationException( - s"Only sparse and dense vectors are supported but got ${other.getClass}.") - } - } -} - From 755d358cece5fa09b912416e60ebd7c16f9be05b Mon Sep 17 00:00:00 2001 From: Alexander Ulanov Date: Mon, 2 Feb 2015 10:51:52 -0800 Subject: [PATCH 16/16] Addressing reviewers comments @mengxr --- .../spark/mllib/feature/ChiSqSelector.scala | 48 +++++++++++++------ .../mllib/feature/ChiSqSelectorSuite.scala | 20 ++++---- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index d1bc3c0422457..c6057c7f837b1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -17,22 +17,34 @@ package org.apache.spark.mllib.feature +import scala.collection.mutable.ArrayBuilder + import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD -import scala.collection.mutable.ArrayBuilder - /** * :: Experimental :: * Chi Squared selector model. * - * @param indices list of indices to select (filter). Must be ordered asc + * @param selectedFeatures list of indices to select (filter). Must be ordered asc */ @Experimental -class ChiSqSelectorModel private[mllib] (indices: Array[Int]) extends VectorTransformer { +class ChiSqSelectorModel (val selectedFeatures: Array[Int]) extends VectorTransformer { + + require(isSorted(selectedFeatures), "Array has to be sorted asc") + + protected def isSorted(array: Array[Int]): Boolean = { + var i = 1 + while (i < array.length) { + if (array(i) < array(i-1)) return false + i += 1 + } + true + } + /** * Applies transformation on a vector. * @@ -40,7 +52,7 @@ class ChiSqSelectorModel private[mllib] (indices: Array[Int]) extends VectorTra * @return transformed vector. */ override def transform(vector: Vector): Vector = { - compress(vector, indices) + compress(vector, selectedFeatures) } /** @@ -56,23 +68,27 @@ class ChiSqSelectorModel private[mllib] (indices: Array[Int]) extends VectorTra val newSize = filterIndices.length val newValues = new ArrayBuilder.ofDouble val newIndices = new ArrayBuilder.ofInt - var i: Int = 0 - var j: Int = 0 - while(i < indices.length && j < filterIndices.length) { - if(indices(i) == filterIndices(j)) { + var i = 0 + var j = 0 + var indicesIdx = 0 + var filterIndicesIdx = 0 + while (i < indices.length && j < filterIndices.length) { + indicesIdx = indices(i) + filterIndicesIdx = filterIndices(j) + if (indicesIdx == filterIndicesIdx) { newIndices += j newValues += values(i) j += 1 i += 1 } else { - if(indices(i) > filterIndices(j)) { + if (indicesIdx > filterIndicesIdx) { j += 1 } else { i += 1 } } } - /** Sparse representation might be ineffective if (newSize ~= newValues.size) */ + // TODO: Sparse representation might be ineffective if (newSize ~= newValues.size) Vectors.sparse(newSize, newIndices.result(), newValues.result()) case DenseVector(values) => val values = features.toArray @@ -96,13 +112,15 @@ class ChiSqSelector (val numTopFeatures: Int) { /** * Returns a ChiSquared feature selector. * - * @param data data used to compute the Chi Squared statistic. + * @param data an `RDD[LabeledPoint]` containing the labeled dataset with categorical features. + * Real-valued features will be treated as categorical for each distinct value. + * Apply feature discretizer before using this function. */ def fit(data: RDD[LabeledPoint]): ChiSqSelectorModel = { val indices = Statistics.chiSqTest(data) - .zipWithIndex.sortBy { case(res, _) => -res.statistic } + .zipWithIndex.sortBy { case (res, _) => -res.statistic } .take(numTopFeatures) - .map{ case(_, indices) => indices } + .map { case (_, indices) => indices } .sorted new ChiSqSelectorModel(indices) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala index 05864aeb9f0f8..747f5914598ec 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ChiSqSelectorSuite.scala @@ -49,20 +49,18 @@ class ChiSqSelectorSuite extends FunSuite with MLlibTestSparkContext { test("ChiSqSelector transform test (sparse & dense vector)") { val labeledDiscreteData = sc.parallelize( - Seq(new LabeledPoint(0.0, Vectors.sparse(3, Array((0, 8.0), (1, 7.0)))), - new LabeledPoint(1.0, Vectors.sparse(3, Array((1, 9.0), (2, 6.0)))), - new LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), - new LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0))) - ), 2) + Seq(LabeledPoint(0.0, Vectors.sparse(3, Array((0, 8.0), (1, 7.0)))), + LabeledPoint(1.0, Vectors.sparse(3, Array((1, 9.0), (2, 6.0)))), + LabeledPoint(1.0, Vectors.dense(Array(0.0, 9.0, 8.0))), + LabeledPoint(2.0, Vectors.dense(Array(8.0, 9.0, 5.0)))), 2) val preFilteredData = - Set(new LabeledPoint(0.0, Vectors.dense(Array(0.0))), - new LabeledPoint(1.0, Vectors.dense(Array(6.0))), - new LabeledPoint(1.0, Vectors.dense(Array(8.0))), - new LabeledPoint(2.0, Vectors.dense(Array(5.0))) - ) + Set(LabeledPoint(0.0, Vectors.dense(Array(0.0))), + LabeledPoint(1.0, Vectors.dense(Array(6.0))), + LabeledPoint(1.0, Vectors.dense(Array(8.0))), + LabeledPoint(2.0, Vectors.dense(Array(5.0)))) val model = new ChiSqSelector(1).fit(labeledDiscreteData) val filteredData = labeledDiscreteData.map { lp => - new LabeledPoint(lp.label, model.transform(lp.features)) + LabeledPoint(lp.label, model.transform(lp.features)) }.collect().toSet assert(filteredData == preFilteredData) }