diff --git a/bin/spark-sql b/bin/spark-sql
index 61ebd8ab6dec8..7813ccc361415 100755
--- a/bin/spark-sql
+++ b/bin/spark-sql
@@ -29,7 +29,7 @@ CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
FWDIR="$(cd `dirname $0`/..; pwd)"
function usage {
- echo "Usage: ./sbin/spark-sql [options] [cli option]"
+ echo "Usage: ./bin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e1c49e35abecd..0159003c88e06 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1004,7 +1004,7 @@ abstract class RDD[T: ClassTag](
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
- h2
+ h1
}).cardinality()
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index b31e3a09e5b9c..4a7dc8dca25e2 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -81,11 +81,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
- val size = 100
- val uniformDistro = for (i <- 1 to 100000) yield i % size
- val simpleRdd = sc.makeRDD(uniformDistro)
- assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4)
- assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1)
+ val size = 1000
+ val uniformDistro = for (i <- 1 to 5000) yield i % size
+ val simpleRdd = sc.makeRDD(uniformDistro, 10)
+ assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2)
+ assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1)
}
test("SparkContext.union") {
diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh
index 42473629d4f15..1867cf4ec46ca 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -35,6 +35,12 @@ RELEASE_VERSION=${RELEASE_VERSION:-1.0.0}
RC_NAME=${RC_NAME:-rc2}
USER_NAME=${USER_NAME:-pwendell}
+if [ -z "$JAVA_HOME" ]; then
+ echo "Error: JAVA_HOME is not set, cannot proceed."
+ exit -1
+fi
+JAVA_7_HOME=${JAVA_7_HOME:-$JAVA_HOME}
+
set -e
GIT_TAG=v$RELEASE_VERSION-$RC_NAME
@@ -130,7 +136,8 @@ scp spark-* \
cd spark
sbt/sbt clean
cd docs
-PRODUCTION=1 jekyll build
+# Compile docs with Java 7 to use nicer format
+JAVA_HOME=$JAVA_7_HOME PRODUCTION=1 jekyll build
echo "Copying release documentation"
rc_docs_folder=${rc_folder}-docs
ssh $USER_NAME@people.apache.org \
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index fd0b9556c7d54..ba7ccd8ce4b8b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -25,16 +25,14 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.tree.configuration.Algo._
-import org.apache.spark.mllib.tree.configuration.Strategy
+import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
import org.apache.spark.mllib.tree.DecisionTree
-import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
+import org.apache.spark.mllib.tree.impurity._
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.correlation.CorrelationNames
@@ -523,17 +521,8 @@ class PythonMLLibAPI extends Serializable {
val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
- val algo: Algo = algoStr match {
- case "classification" => Classification
- case "regression" => Regression
- case _ => throw new IllegalArgumentException(s"Bad algoStr parameter: $algoStr")
- }
- val impurity: Impurity = impurityStr match {
- case "gini" => Gini
- case "entropy" => Entropy
- case "variance" => Variance
- case _ => throw new IllegalArgumentException(s"Bad impurityStr parameter: $impurityStr")
- }
+ val algo = Algo.fromString(algoStr)
+ val impurity = Impurities.fromString(impurityStr)
val strategy = new Strategy(
algo = algo,
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
index 7ed611a857acc..d40d5553c1d21 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
@@ -36,87 +36,25 @@ class IDF {
// TODO: Allow different IDF formulations.
- private var brzIdf: BDV[Double] = _
-
/**
* Computes the inverse document frequency.
* @param dataset an RDD of term frequency vectors
*/
- def fit(dataset: RDD[Vector]): this.type = {
- brzIdf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
+ def fit(dataset: RDD[Vector]): IDFModel = {
+ val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2)
).idf()
- this
+ new IDFModel(idf)
}
/**
* Computes the inverse document frequency.
* @param dataset a JavaRDD of term frequency vectors
*/
- def fit(dataset: JavaRDD[Vector]): this.type = {
+ def fit(dataset: JavaRDD[Vector]): IDFModel = {
fit(dataset.rdd)
}
-
- /**
- * Transforms term frequency (TF) vectors to TF-IDF vectors.
- * @param dataset an RDD of term frequency vectors
- * @return an RDD of TF-IDF vectors
- */
- def transform(dataset: RDD[Vector]): RDD[Vector] = {
- if (!initialized) {
- throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
- }
- val theIdf = brzIdf
- val bcIdf = dataset.context.broadcast(theIdf)
- dataset.mapPartitions { iter =>
- val thisIdf = bcIdf.value
- iter.map { v =>
- val n = v.size
- v match {
- case sv: SparseVector =>
- val nnz = sv.indices.size
- val newValues = new Array[Double](nnz)
- var k = 0
- while (k < nnz) {
- newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
- k += 1
- }
- Vectors.sparse(n, sv.indices, newValues)
- case dv: DenseVector =>
- val newValues = new Array[Double](n)
- var j = 0
- while (j < n) {
- newValues(j) = dv.values(j) * thisIdf(j)
- j += 1
- }
- Vectors.dense(newValues)
- case other =>
- throw new UnsupportedOperationException(
- s"Only sparse and dense vectors are supported but got ${other.getClass}.")
- }
- }
- }
- }
-
- /**
- * Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
- * @param dataset a JavaRDD of term frequency vectors
- * @return a JavaRDD of TF-IDF vectors
- */
- def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
- transform(dataset.rdd).toJavaRDD()
- }
-
- /** Returns the IDF vector. */
- def idf(): Vector = {
- if (!initialized) {
- throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
- }
- Vectors.fromBreeze(brzIdf)
- }
-
- private def initialized: Boolean = brzIdf != null
}
private object IDF {
@@ -177,18 +115,72 @@ private object IDF {
private def isEmpty: Boolean = m == 0L
/** Returns the current IDF vector. */
- def idf(): BDV[Double] = {
+ def idf(): Vector = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
- val inv = BDV.zeros[Double](n)
+ val inv = new Array[Double](n)
var j = 0
while (j < n) {
inv(j) = math.log((m + 1.0)/ (df(j) + 1.0))
j += 1
}
- inv
+ Vectors.dense(inv)
}
}
}
+
+/**
+ * :: Experimental ::
+ * Represents an IDF model that can transform term frequency vectors.
+ */
+@Experimental
+class IDFModel private[mllib] (val idf: Vector) extends Serializable {
+
+ /**
+ * Transforms term frequency (TF) vectors to TF-IDF vectors.
+ * @param dataset an RDD of term frequency vectors
+ * @return an RDD of TF-IDF vectors
+ */
+ def transform(dataset: RDD[Vector]): RDD[Vector] = {
+ val bcIdf = dataset.context.broadcast(idf)
+ dataset.mapPartitions { iter =>
+ val thisIdf = bcIdf.value
+ iter.map { v =>
+ val n = v.size
+ v match {
+ case sv: SparseVector =>
+ val nnz = sv.indices.size
+ val newValues = new Array[Double](nnz)
+ var k = 0
+ while (k < nnz) {
+ newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
+ k += 1
+ }
+ Vectors.sparse(n, sv.indices, newValues)
+ case dv: DenseVector =>
+ val newValues = new Array[Double](n)
+ var j = 0
+ while (j < n) {
+ newValues(j) = dv.values(j) * thisIdf(j)
+ j += 1
+ }
+ Vectors.dense(newValues)
+ case other =>
+ throw new UnsupportedOperationException(
+ s"Only sparse and dense vectors are supported but got ${other.getClass}.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
+ * @param dataset a JavaRDD of term frequency vectors
+ * @return a JavaRDD of TF-IDF vectors
+ */
+ def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
+ transform(dataset.rdd).toJavaRDD()
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
index e6c9f8f67df63..4dfd1f0ab8134 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
@@ -17,8 +17,9 @@
package org.apache.spark.mllib.feature
-import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
+import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.rdd.RDDFunctions._
@@ -35,37 +36,55 @@ import org.apache.spark.rdd.RDD
* @param withStd True by default. Scales the data to unit standard deviation.
*/
@Experimental
-class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer {
+class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {
def this() = this(false, true)
- require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.")
-
- private var mean: BV[Double] = _
- private var factor: BV[Double] = _
+ if (!(withMean || withStd)) {
+ logWarning("Both withMean and withStd are false. The model does nothing.")
+ }
/**
* Computes the mean and variance and stores as a model to be used for later scaling.
*
* @param data The data used to compute the mean and variance to build the transformation model.
- * @return This StandardScalar object.
+ * @return a StandardScalarModel
*/
- def fit(data: RDD[Vector]): this.type = {
+ def fit(data: RDD[Vector]): StandardScalerModel = {
+ // TODO: skip computation if both withMean and withStd are false
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
+ new StandardScalerModel(withMean, withStd, summary.mean, summary.variance)
+ }
+}
- mean = summary.mean.toBreeze
- factor = summary.variance.toBreeze
- require(mean.length == factor.length)
+/**
+ * :: Experimental ::
+ * Represents a StandardScaler model that can transform vectors.
+ *
+ * @param withMean whether to center the data before scaling
+ * @param withStd whether to scale the data to have unit standard deviation
+ * @param mean column mean values
+ * @param variance column variance values
+ */
+@Experimental
+class StandardScalerModel private[mllib] (
+ val withMean: Boolean,
+ val withStd: Boolean,
+ val mean: Vector,
+ val variance: Vector) extends VectorTransformer {
+
+ require(mean.size == variance.size)
+ private lazy val factor: BDV[Double] = {
+ val f = BDV.zeros[Double](variance.size)
var i = 0
- while (i < factor.length) {
- factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0
+ while (i < f.size) {
+ f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
i += 1
}
-
- this
+ f
}
/**
@@ -76,13 +95,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
* for the column with zero variance.
*/
override def transform(vector: Vector): Vector = {
- if (mean == null || factor == null) {
- throw new IllegalStateException(
- "Haven't learned column summary statistics yet. Call fit first.")
- }
-
- require(vector.size == mean.length)
-
+ require(mean.size == vector.size)
if (withMean) {
vector.toBreeze match {
case dv: BDV[Double] =>
@@ -115,5 +128,4 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
vector
}
}
-
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index 1d03e6e3b36cf..bb50f07be5d7b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -17,14 +17,18 @@
package org.apache.spark.mllib.tree
+import org.apache.spark.api.java.JavaRDD
+
+import scala.collection.JavaConverters._
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.tree.configuration.Strategy
+import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.FeatureType._
import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
-import org.apache.spark.mllib.tree.impurity.Impurity
+import org.apache.spark.mllib.tree.impurity.{Impurities, Gini, Entropy, Impurity}
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
@@ -40,6 +44,8 @@ import org.apache.spark.util.random.XORShiftRandom
@Experimental
class DecisionTree (private val strategy: Strategy) extends Serializable with Logging {
+ strategy.assertValid()
+
/**
* Method to train a decision tree model over an RDD
* @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]
@@ -200,6 +206,10 @@ object DecisionTree extends Serializable with Logging {
* Method to train a decision tree model.
* The method supports binary and multiclass classification and regression.
*
+ * Note: Using [[org.apache.spark.mllib.tree.DecisionTree$#trainClassifier]]
+ * and [[org.apache.spark.mllib.tree.DecisionTree$#trainRegressor]]
+ * is recommended to clearly separate classification and regression.
+ *
* @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* For classification, labels should take values {0, 1, ..., numClasses-1}.
* For regression, labels are real numbers.
@@ -213,10 +223,12 @@ object DecisionTree extends Serializable with Logging {
}
/**
- * Method to train a decision tree model where the instances are represented as an RDD of
- * (label, features) pairs. The method supports binary classification and regression. For the
- * binary classification, the label for each instance should either be 0 or 1 to denote the two
- * classes.
+ * Method to train a decision tree model.
+ * The method supports binary and multiclass classification and regression.
+ *
+ * Note: Using [[org.apache.spark.mllib.tree.DecisionTree$#trainClassifier]]
+ * and [[org.apache.spark.mllib.tree.DecisionTree$#trainRegressor]]
+ * is recommended to clearly separate classification and regression.
*
* @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* For classification, labels should take values {0, 1, ..., numClasses-1}.
@@ -237,10 +249,12 @@ object DecisionTree extends Serializable with Logging {
}
/**
- * Method to train a decision tree model where the instances are represented as an RDD of
- * (label, features) pairs. The method supports binary classification and regression. For the
- * binary classification, the label for each instance should either be 0 or 1 to denote the two
- * classes.
+ * Method to train a decision tree model.
+ * The method supports binary and multiclass classification and regression.
+ *
+ * Note: Using [[org.apache.spark.mllib.tree.DecisionTree$#trainClassifier]]
+ * and [[org.apache.spark.mllib.tree.DecisionTree$#trainRegressor]]
+ * is recommended to clearly separate classification and regression.
*
* @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* For classification, labels should take values {0, 1, ..., numClasses-1}.
@@ -263,11 +277,12 @@ object DecisionTree extends Serializable with Logging {
}
/**
- * Method to train a decision tree model where the instances are represented as an RDD of
- * (label, features) pairs. The decision tree method supports binary classification and
- * regression. For the binary classification, the label for each instance should either be 0 or
- * 1 to denote the two classes. The method also supports categorical features inputs where the
- * number of categories can specified using the categoricalFeaturesInfo option.
+ * Method to train a decision tree model.
+ * The method supports binary and multiclass classification and regression.
+ *
+ * Note: Using [[org.apache.spark.mllib.tree.DecisionTree$#trainClassifier]]
+ * and [[org.apache.spark.mllib.tree.DecisionTree$#trainRegressor]]
+ * is recommended to clearly separate classification and regression.
*
* @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* For classification, labels should take values {0, 1, ..., numClasses-1}.
@@ -279,11 +294,9 @@ object DecisionTree extends Serializable with Logging {
* @param numClassesForClassification number of classes for classification. Default value of 2.
* @param maxBins maximum number of bins used for splitting features
* @param quantileCalculationStrategy algorithm for calculating quantiles
- * @param categoricalFeaturesInfo A map storing information about the categorical variables and
- * the number of discrete values they take. For example,
- * an entry (n -> k) implies the feature n is categorical with k
- * categories 0, 1, 2, ... , k-1. It's important to note that
- * features are zero-indexed.
+ * @param categoricalFeaturesInfo Map storing arity of categorical features.
+ * E.g., an entry (n -> k) indicates that feature n is categorical
+ * with k categories indexed from 0: {0, 1, ..., k-1}.
* @return DecisionTreeModel that can be used for prediction
*/
def train(
@@ -300,6 +313,93 @@ object DecisionTree extends Serializable with Logging {
new DecisionTree(strategy).train(input)
}
+ /**
+ * Method to train a decision tree model for binary or multiclass classification.
+ *
+ * @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
+ * Labels should take values {0, 1, ..., numClasses-1}.
+ * @param numClassesForClassification number of classes for classification.
+ * @param categoricalFeaturesInfo Map storing arity of categorical features.
+ * E.g., an entry (n -> k) indicates that feature n is categorical
+ * with k categories indexed from 0: {0, 1, ..., k-1}.
+ * @param impurity Criterion used for information gain calculation.
+ * Supported values: "gini" (recommended) or "entropy".
+ * @param maxDepth Maximum depth of the tree.
+ * E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.
+ * (suggested value: 4)
+ * @param maxBins maximum number of bins used for splitting features
+ * (suggested value: 100)
+ * @return DecisionTreeModel that can be used for prediction
+ */
+ def trainClassifier(
+ input: RDD[LabeledPoint],
+ numClassesForClassification: Int,
+ categoricalFeaturesInfo: Map[Int, Int],
+ impurity: String,
+ maxDepth: Int,
+ maxBins: Int): DecisionTreeModel = {
+ val impurityType = Impurities.fromString(impurity)
+ train(input, Classification, impurityType, maxDepth, numClassesForClassification, maxBins, Sort,
+ categoricalFeaturesInfo)
+ }
+
+ /**
+ * Java-friendly API for [[org.apache.spark.mllib.tree.DecisionTree$#trainClassifier]]
+ */
+ def trainClassifier(
+ input: JavaRDD[LabeledPoint],
+ numClassesForClassification: Int,
+ categoricalFeaturesInfo: java.util.Map[java.lang.Integer, java.lang.Integer],
+ impurity: String,
+ maxDepth: Int,
+ maxBins: Int): DecisionTreeModel = {
+ trainClassifier(input.rdd, numClassesForClassification,
+ categoricalFeaturesInfo.asInstanceOf[java.util.Map[Int, Int]].asScala.toMap,
+ impurity, maxDepth, maxBins)
+ }
+
+ /**
+ * Method to train a decision tree model for regression.
+ *
+ * @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
+ * Labels are real numbers.
+ * @param categoricalFeaturesInfo Map storing arity of categorical features.
+ * E.g., an entry (n -> k) indicates that feature n is categorical
+ * with k categories indexed from 0: {0, 1, ..., k-1}.
+ * @param impurity Criterion used for information gain calculation.
+ * Supported values: "variance".
+ * @param maxDepth Maximum depth of the tree.
+ * E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.
+ * (suggested value: 4)
+ * @param maxBins maximum number of bins used for splitting features
+ * (suggested value: 100)
+ * @return DecisionTreeModel that can be used for prediction
+ */
+ def trainRegressor(
+ input: RDD[LabeledPoint],
+ categoricalFeaturesInfo: Map[Int, Int],
+ impurity: String,
+ maxDepth: Int,
+ maxBins: Int): DecisionTreeModel = {
+ val impurityType = Impurities.fromString(impurity)
+ train(input, Regression, impurityType, maxDepth, 0, maxBins, Sort, categoricalFeaturesInfo)
+ }
+
+ /**
+ * Java-friendly API for [[org.apache.spark.mllib.tree.DecisionTree$#trainRegressor]]
+ */
+ def trainRegressor(
+ input: JavaRDD[LabeledPoint],
+ categoricalFeaturesInfo: java.util.Map[java.lang.Integer, java.lang.Integer],
+ impurity: String,
+ maxDepth: Int,
+ maxBins: Int): DecisionTreeModel = {
+ trainRegressor(input.rdd,
+ categoricalFeaturesInfo.asInstanceOf[java.util.Map[Int, Int]].asScala.toMap,
+ impurity, maxDepth, maxBins)
+ }
+
+
private val InvalidBinIndex = -1
/**
@@ -1331,16 +1431,15 @@ object DecisionTree extends Serializable with Logging {
* Categorical features:
* For each feature, there is 1 bin per split.
* Splits and bins are handled in 2 ways:
- * (a) For multiclass classification with a low-arity feature
+ * (a) "unordered features"
+ * For multiclass classification with a low-arity feature
* (i.e., if isMulticlass && isSpaceSufficientForAllCategoricalSplits),
* the feature is split based on subsets of categories.
- * There are 2^(maxFeatureValue - 1) - 1 splits.
- * (b) For regression and binary classification,
+ * There are math.pow(2, maxFeatureValue - 1) - 1 splits.
+ * (b) "ordered features"
+ * For regression and binary classification,
* and for multiclass classification with a high-arity feature,
- * there is one split per category.
-
- * Categorical case (a) features are called unordered features.
- * Other cases are called ordered features.
+ * there is one bin per category.
*
* @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]
* @param strategy [[org.apache.spark.mllib.tree.configuration.Strategy]] instance containing
@@ -1368,10 +1467,14 @@ object DecisionTree extends Serializable with Logging {
/*
- * Ensure #bins is always greater than the categories. For multiclass classification,
- * #bins should be greater than 2^(maxCategories - 1) - 1.
+ * Ensure numBins is always greater than the categories. For multiclass classification,
+ * numBins should be greater than 2^(maxCategories - 1) - 1.
* It's a limitation of the current implementation but a reasonable trade-off since features
* with large number of categories get favored over continuous features.
+ *
+ * This needs to be checked here instead of in Strategy since numBins can be determined
+ * by the number of training examples.
+ * TODO: Allow this case, where we simply will know nothing about some categories.
*/
if (strategy.categoricalFeaturesInfo.size > 0) {
val maxCategoriesForFeatures = strategy.categoricalFeaturesInfo.maxBy(_._2)._2
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
index 79a01f58319e8..0ef9c6181a0a0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
@@ -27,4 +27,10 @@ import org.apache.spark.annotation.Experimental
object Algo extends Enumeration {
type Algo = Value
val Classification, Regression = Value
+
+ private[mllib] def fromString(name: String): Algo = name match {
+ case "classification" => Classification
+ case "regression" => Regression
+ case _ => throw new IllegalArgumentException(s"Did not recognize Algo name: $name")
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
index 4ee4bcd0bcbc7..f31a503608b22 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.tree.configuration
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
-import org.apache.spark.mllib.tree.impurity.Impurity
+import org.apache.spark.mllib.tree.impurity.{Variance, Entropy, Gini, Impurity}
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
@@ -90,4 +90,33 @@ class Strategy (
categoricalFeaturesInfo.asInstanceOf[java.util.Map[Int, Int]].asScala.toMap)
}
+ private[tree] def assertValid(): Unit = {
+ algo match {
+ case Classification =>
+ require(numClassesForClassification >= 2,
+ s"DecisionTree Strategy for Classification must have numClassesForClassification >= 2," +
+ s" but numClassesForClassification = $numClassesForClassification.")
+ require(Set(Gini, Entropy).contains(impurity),
+ s"DecisionTree Strategy given invalid impurity for Classification: $impurity." +
+ s" Valid settings: Gini, Entropy")
+ case Regression =>
+ require(impurity == Variance,
+ s"DecisionTree Strategy given invalid impurity for Regression: $impurity." +
+ s" Valid settings: Variance")
+ case _ =>
+ throw new IllegalArgumentException(
+ s"DecisionTree Strategy given invalid algo parameter: $algo." +
+ s" Valid settings are: Classification, Regression.")
+ }
+ require(maxDepth >= 0, s"DecisionTree Strategy given invalid maxDepth parameter: $maxDepth." +
+ s" Valid values are integers >= 0.")
+ require(maxBins >= 2, s"DecisionTree Strategy given invalid maxBins parameter: $maxBins." +
+ s" Valid values are integers >= 2.")
+ categoricalFeaturesInfo.foreach { case (feature, arity) =>
+ require(arity >= 2,
+ s"DecisionTree Strategy given invalid categoricalFeaturesInfo setting:" +
+ s" feature $feature has $arity categories. The number of categories should be >= 2.")
+ }
+ }
+
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurities.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurities.scala
new file mode 100644
index 0000000000000..9a6452aa13a61
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurities.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impurity
+
+/**
+ * Factory for Impurity instances.
+ */
+private[mllib] object Impurities {
+
+ def fromString(name: String): Impurity = name match {
+ case "gini" => Gini
+ case "entropy" => Entropy
+ case "variance" => Variance
+ case _ => throw new IllegalArgumentException(s"Did not recognize Impurity name: $name")
+ }
+
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
index 78a2804ff204b..53d9c0c640b98 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
@@ -36,18 +36,12 @@ class IDFSuite extends FunSuite with LocalSparkContext {
val m = localTermFrequencies.size
val termFrequencies = sc.parallelize(localTermFrequencies, 2)
val idf = new IDF
- intercept[IllegalStateException] {
- idf.idf()
- }
- intercept[IllegalStateException] {
- idf.transform(termFrequencies)
- }
- idf.fit(termFrequencies)
+ val model = idf.fit(termFrequencies)
val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((m.toDouble + 1.0) / (x + 1.0))
})
- assert(idf.idf() ~== expected absTol 1e-12)
- val tfidf = idf.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
+ assert(model.idf ~== expected absTol 1e-12)
+ val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
assert(tfidf.size === 3)
val tfidf0 = tfidf(0L).asInstanceOf[SparseVector]
assert(tfidf0.indices === Array(1, 3))
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
index 5a9be923a8625..e217b93cebbdb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
@@ -50,23 +50,17 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler()
val standardizer3 = new StandardScaler(withMean = true, withStd = false)
- withClue("Using a standardizer before fitting the model should throw exception.") {
- intercept[IllegalStateException] {
- data.map(standardizer1.transform)
- }
- }
-
- standardizer1.fit(dataRDD)
- standardizer2.fit(dataRDD)
- standardizer3.fit(dataRDD)
+ val model1 = standardizer1.fit(dataRDD)
+ val model2 = standardizer2.fit(dataRDD)
+ val model3 = standardizer3.fit(dataRDD)
- val data1 = data.map(standardizer1.transform)
- val data2 = data.map(standardizer2.transform)
- val data3 = data.map(standardizer3.transform)
+ val data1 = data.map(model1.transform)
+ val data2 = data.map(model2.transform)
+ val data3 = data.map(model3.transform)
- val data1RDD = standardizer1.transform(dataRDD)
- val data2RDD = standardizer2.transform(dataRDD)
- val data3RDD = standardizer3.transform(dataRDD)
+ val data1RDD = model1.transform(dataRDD)
+ val data2RDD = model2.transform(dataRDD)
+ val data3RDD = model3.transform(dataRDD)
val summary = computeSummary(dataRDD)
val summary1 = computeSummary(data1RDD)
@@ -129,25 +123,25 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler()
val standardizer3 = new StandardScaler(withMean = true, withStd = false)
- standardizer1.fit(dataRDD)
- standardizer2.fit(dataRDD)
- standardizer3.fit(dataRDD)
+ val model1 = standardizer1.fit(dataRDD)
+ val model2 = standardizer2.fit(dataRDD)
+ val model3 = standardizer3.fit(dataRDD)
- val data2 = data.map(standardizer2.transform)
+ val data2 = data.map(model2.transform)
withClue("Standardization with mean can not be applied on sparse input.") {
intercept[IllegalArgumentException] {
- data.map(standardizer1.transform)
+ data.map(model1.transform)
}
}
withClue("Standardization with mean can not be applied on sparse input.") {
intercept[IllegalArgumentException] {
- data.map(standardizer3.transform)
+ data.map(model3.transform)
}
}
- val data2RDD = standardizer2.transform(dataRDD)
+ val data2RDD = model2.transform(dataRDD)
val summary2 = computeSummary(data2RDD)
@@ -181,13 +175,13 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler(withMean = true, withStd = false)
val standardizer3 = new StandardScaler(withMean = false, withStd = true)
- standardizer1.fit(dataRDD)
- standardizer2.fit(dataRDD)
- standardizer3.fit(dataRDD)
+ val model1 = standardizer1.fit(dataRDD)
+ val model2 = standardizer2.fit(dataRDD)
+ val model3 = standardizer3.fit(dataRDD)
- val data1 = data.map(standardizer1.transform)
- val data2 = data.map(standardizer2.transform)
- val data3 = data.map(standardizer3.transform)
+ val data1 = data.map(model1.transform)
+ val data2 = data.map(model2.transform)
+ val data3 = data.map(model3.transform)
assert(data1.forall(_.toArray.forall(_ == 0.0)),
"The variance is zero, so the transformed result should be 0.0")
diff --git a/pom.xml b/pom.xml
index 76bf6d8f902a8..920912353fe9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,8 +146,7 @@
central
Maven Repository
-
- https://repo.maven.apache.org/maven2
+ https://repo1.maven.org/maven2
true
@@ -229,6 +228,9 @@
true
+
+ false
+
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ed587783d5606..63a285b81a60c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -30,11 +30,11 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
- val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark,
+ val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
sql, streaming, streamingFlumeSink, streamingFlume, streamingKafka, streamingMqtt,
streamingTwitter, streamingZeromq) =
Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
- "spark", "sql", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka",
+ "sql", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka",
"streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _))
val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) =
@@ -44,8 +44,9 @@ object BuildCommons {
val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples")
.map(ProjectRef(buildLocation, _))
- val tools = "tools"
-
+ val tools = ProjectRef(buildLocation, "tools")
+ // Root project.
+ val spark = ProjectRef(buildLocation, "spark")
val sparkHome = buildLocation
}
@@ -126,26 +127,6 @@ object SparkBuild extends PomBuild {
publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn
)
- /** Following project only exists to pull previous artifacts of Spark for generating
- Mima ignores. For more information see: SPARK 2071 */
- lazy val oldDeps = Project("oldDeps", file("dev"), settings = oldDepsSettings)
-
- def versionArtifact(id: String): Option[sbt.ModuleID] = {
- val fullId = id + "_2.10"
- Some("org.apache.spark" % fullId % "1.0.0")
- }
-
- def oldDepsSettings() = Defaults.defaultSettings ++ Seq(
- name := "old-deps",
- scalaVersion := "2.10.4",
- retrieveManaged := true,
- retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
- libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
- "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
- "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
- "spark-core").map(versionArtifact(_).get intransitive())
- )
-
def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {
val existingSettings = projectsMap.getOrElse(projectRef.project, Seq[Setting[_]]())
projectsMap += (projectRef.project -> (existingSettings ++ settings))
@@ -184,7 +165,7 @@ object SparkBuild extends PomBuild {
super.projectDefinitions(baseDirectory).map { x =>
if (projectsMap.exists(_._1 == x.id)) x.settings(projectsMap(x.id): _*)
else x.settings(Seq[Setting[_]](): _*)
- } ++ Seq[Project](oldDeps)
+ } ++ Seq[Project](OldDeps.project)
}
}
@@ -193,6 +174,31 @@ object Flume {
lazy val settings = sbtavro.SbtAvro.avroSettings
}
+/**
+ * Following project only exists to pull previous artifacts of Spark for generating
+ * Mima ignores. For more information see: SPARK 2071
+ */
+object OldDeps {
+
+ lazy val project = Project("oldDeps", file("dev"), settings = oldDepsSettings)
+
+ def versionArtifact(id: String): Option[sbt.ModuleID] = {
+ val fullId = id + "_2.10"
+ Some("org.apache.spark" % fullId % "1.0.0")
+ }
+
+ def oldDepsSettings() = Defaults.defaultSettings ++ Seq(
+ name := "old-deps",
+ scalaVersion := "2.10.4",
+ retrieveManaged := true,
+ retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
+ libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
+ "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
+ "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
+ "spark-core").map(versionArtifact(_).get intransitive())
+ )
+}
+
object Catalyst {
lazy val settings = Seq(
addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full),
@@ -285,9 +291,9 @@ object Unidoc {
publish := {},
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
- inAnyProject -- inProjects(repl, examples, tools, catalyst, yarn, yarnAlpha),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, yarn, yarnAlpha),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(repl, bagel, graphx, examples, tools, catalyst, yarn, yarnAlpha),
+ inAnyProject -- inProjects(OldDeps.project, repl, bagel, graphx, examples, tools, catalyst, yarn, yarnAlpha),
// Skip class names containing $ and some internal packages in Javadocs
unidocAllSources in (JavaUnidoc, unidoc) := {
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 06d18e193076e..2a61f56c2ea60 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -23,6 +23,6 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1")
-addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.0")
+addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1")
addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 2518001ea0b93..e1a4671709b7d 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -131,7 +131,7 @@ class DecisionTree(object):
"""
@staticmethod
- def trainClassifier(data, numClasses, categoricalFeaturesInfo={},
+ def trainClassifier(data, numClasses, categoricalFeaturesInfo,
impurity="gini", maxDepth=4, maxBins=100):
"""
Train a DecisionTreeModel for classification.
@@ -150,12 +150,20 @@ def trainClassifier(data, numClasses, categoricalFeaturesInfo={},
:param maxBins: Number of bins used for finding splits at each node.
:return: DecisionTreeModel
"""
- return DecisionTree.train(data, "classification", numClasses,
- categoricalFeaturesInfo,
- impurity, maxDepth, maxBins)
+ sc = data.context
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
+ categoricalFeaturesInfoJMap = \
+ MapConverter().convert(categoricalFeaturesInfo,
+ sc._gateway._gateway_client)
+ model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
+ dataBytes._jrdd, "classification",
+ numClasses, categoricalFeaturesInfoJMap,
+ impurity, maxDepth, maxBins)
+ dataBytes.unpersist()
+ return DecisionTreeModel(sc, model)
@staticmethod
- def trainRegressor(data, categoricalFeaturesInfo={},
+ def trainRegressor(data, categoricalFeaturesInfo,
impurity="variance", maxDepth=4, maxBins=100):
"""
Train a DecisionTreeModel for regression.
@@ -173,42 +181,14 @@ def trainRegressor(data, categoricalFeaturesInfo={},
:param maxBins: Number of bins used for finding splits at each node.
:return: DecisionTreeModel
"""
- return DecisionTree.train(data, "regression", 0,
- categoricalFeaturesInfo,
- impurity, maxDepth, maxBins)
-
- @staticmethod
- def train(data, algo, numClasses, categoricalFeaturesInfo,
- impurity, maxDepth, maxBins=100):
- """
- Train a DecisionTreeModel for classification or regression.
-
- :param data: Training data: RDD of LabeledPoint.
- For classification, labels are integers
- {0,1,...,numClasses}.
- For regression, labels are real numbers.
- :param algo: "classification" or "regression"
- :param numClasses: Number of classes for classification.
- :param categoricalFeaturesInfo: Map from categorical feature index
- to number of categories.
- Any feature not in this map
- is treated as continuous.
- :param impurity: For classification: "entropy" or "gini".
- For regression: "variance".
- :param maxDepth: Max depth of tree.
- E.g., depth 0 means 1 leaf node.
- Depth 1 means 1 internal node + 2 leaf nodes.
- :param maxBins: Number of bins used for finding splits at each node.
- :return: DecisionTreeModel
- """
sc = data.context
dataBytes = _get_unmangled_labeled_point_rdd(data)
categoricalFeaturesInfoJMap = \
MapConverter().convert(categoricalFeaturesInfo,
sc._gateway._gateway_client)
model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, algo,
- numClasses, categoricalFeaturesInfoJMap,
+ dataBytes._jrdd, "regression",
+ 0, categoricalFeaturesInfoJMap,
impurity, maxDepth, maxBins)
dataBytes.unpersist()
return DecisionTreeModel(sc, model)