diff --git a/mllib/src/main/scala/org/apache/spark/ml/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/LabeledPoint.scala new file mode 100644 index 0000000000000..47a6c71b78916 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/LabeledPoint.scala @@ -0,0 +1,24 @@ +package org.apache.spark.ml + +import org.apache.spark.mllib.linalg.Vector + +/** + * Class that represents an instance (data point) for prediction tasks. + * + * @param label Label to predict + * @param features List of features describing this instance + * @param weight Instance weight + */ +case class LabeledPoint(label: Double, features: Vector, weight: Double) { + + /** Default constructor which sets instance weight to 1.0 */ + def this(label: Double, features: Vector) = this(label, features, 1.0) + + override def toString: String = { + "(%s,%s,%s)".format(label, features, weight) + } +} + +object LabeledPoint { + def apply(label: Double, features: Vector) = new LabeledPoint(label, features) +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/AdaBoost.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/AdaBoost.scala new file mode 100644 index 0000000000000..5517cd95cf42f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/AdaBoost.scala @@ -0,0 +1,208 @@ +package org.apache.spark.ml.classification + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.ml.LabeledPoint +import org.apache.spark.ml.evaluation.ClassificationEvaluator +import org.apache.spark.ml.param.{HasWeightCol, Param, ParamMap, HasMaxIter} +import org.apache.spark.ml.impl.estimator.{ProbabilisticClassificationModel, WeakLearner, + IterativeEstimator, IterativeSolver} + + +private[classification] trait AdaBoostParams extends ClassifierParams + with HasMaxIter with HasWeightCol { + + /** param for weak learner type */ + val weakLearner: Param[Classifier[_, _]] = + new Param(this, "weakLearner", "weak learning algorithm") + def getWeakLearner: Classifier[_, _] = get(weakLearner) + + /** param for weak learner param maps */ + val weakLearnerParamMap: Param[ParamMap] = + new Param(this, "weakLearnerParamMap", "param map for the weak learner") + def getWeakLearnerParamMap: ParamMap = get(weakLearnerParamMap) + + override def validate(paramMap: ParamMap): Unit = { + // TODO: Check maxIter, weakLearner, weakLearnerParamMap, weightCol + // Check: If the weak learner does not extend WeakLearner, then featuresColName should be + // castable to FeaturesType. + } +} + + +/** + * AdaBoost + * + * Developer notes: + * - If the weak learner implements the [[WeakLearner]] + */ +class AdaBoost extends Classifier[AdaBoost, AdaBoostModel] + with AdaBoostParams + with IterativeEstimator[AdaBoostModel] { + + def setMaxIter(value: Int): this.type = set(maxIter, value) + def setWeightCol(value: String): this.type = set(weightCol, value) + def setWeakLearner(value: Classifier[_, _]): this.type = set(weakLearner, value) + def setWeakLearnerParamMap(value: ParamMap): this.type = set(weakLearnerParamMap, value) + + /** + * Extract LabeledPoints, using the weak learner's native feature representation if possible. + * @param paramMap Complete paramMap (after combining with the internal paramMap) + */ + private def extractLabeledPoints(dataset: SchemaRDD, paramMap: ParamMap): RDD[LabeledPoint] = { + import dataset.sqlContext._ + val featuresColName = paramMap(featuresCol) + val wl = paramMap(weakLearner) + val featuresRDD: RDD[Vector] = wl match { + case wlTagged: WeakLearner => + val wlParamMap = paramMap(weakLearnerParamMap) + val wlFeaturesColName = wlParamMap(wl.featuresCol) + val origFeaturesRDD = dataset.select(featuresColName.attr).as(wlFeaturesColName.attr) + wlTagged.getNativeFeatureRDD(origFeaturesRDD, wlParamMap) + case _ => + dataset.select(featuresColName.attr).map { case Row(features: Vector) => features } + } + + val labelColName = paramMap(labelCol) + if (paramMap.contains(weightCol)) { + val weightColName = paramMap(weightCol) + dataset.select(labelColName.attr, weightColName.attr) + .zip(featuresRDD).map { case (Row(label: Double, weight: Double), features: Vector) => + LabeledPoint(label, features, weight) + } + } else { + dataset.select(labelColName.attr) + .zip(featuresRDD).map { case (Row(label: Double), features: Vector) => + LabeledPoint(label, features) + } + } + } + + // From Classifier + override def fit(dataset: SchemaRDD, paramMap: ParamMap): AdaBoostModel = { + val map = this.paramMap ++ paramMap + val labeledPoints: RDD[LabeledPoint] = extractLabeledPoints(dataset, map) + train(labeledPoints, paramMap) + } + + // From IterativeEstimator + override private[ml] def createSolver(dataset: SchemaRDD, paramMap: ParamMap): AdaBoostSolver = { + val map = this.paramMap ++ paramMap + val labeledPoints: RDD[LabeledPoint] = extractLabeledPoints(dataset, map) + new AdaBoostSolver(labeledPoints, this, map) + } + + // From Predictor + override def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): AdaBoostModel = { + val map = this.paramMap ++ paramMap + val solver = new AdaBoostSolver(dataset, this, map) + while (solver.step()) { } + solver.currentModel + } +} + + +class AdaBoostModel private[ml] ( + val weakHypotheses: Array[ClassificationModel[_]], + val weakHypothesisWeights: Array[Double], + override val parent: AdaBoost, + override val fittingParamMap: ParamMap) + extends ClassificationModel[AdaBoostModel] + with ProbabilisticClassificationModel + with AdaBoostParams { + + require(weakHypotheses.size != 0) + require(weakHypotheses.size == weakHypothesisWeights.size) + + // From Classifier.Model: + override val numClasses: Int = weakHypotheses(0).numClasses + + require(weakHypotheses.forall(_.numClasses == numClasses)) + + private val margin: Vector => Double = (features) => { + weakHypotheses.zip(weakHypothesisWeights) + .foldLeft(0.0) { case (total: Double, (wh: ClassificationModel[_], weight: Double)) => + val pred = if (wh.predict(features) == 1.0) 1.0 else -1.0 + total + weight * pred + } + } + + private val score: Vector => Double = (features) => { + val m = margin(features) + 1.0 / (1.0 + math.exp(-2.0 * m)) + } + + override def predictProbabilities(features: Vector): Vector = { + val s = score(features) + Vectors.dense(Array(1.0 - s, s)) + } + + override def predictRaw(features: Vector): Vector = { + val m = margin(features) + Vectors.dense(Array(-m, m)) + } +} + + +private[ml] class AdaBoostSolver( + val origData: RDD[LabeledPoint], + val parent: AdaBoost, + val paramMap: ParamMap) extends IterativeSolver[AdaBoostModel] { + + private val weakHypotheses = new ArrayBuffer[ClassificationModel[_]] + private val weakHypothesisWeights = new ArrayBuffer[Double] + + private val wl: Classifier[_, _] = paramMap(parent.weakLearner) + private val wlParamMap = paramMap(parent.weakLearnerParamMap) + override val maxIterations: Int = paramMap(parent.maxIter) + + // TODO: Decide if this alg should cache data, or if that should be left to the user. + + // TODO: check for weights = 0 + // TODO: EDITING HERE NOW: switch to log weights + private var logInstanceWeights: RDD[Double] = origData.map(lp => math.log(lp.weight)) + + override def stepImpl(): Boolean = ??? /*{ + // Check if the weak learner takes instance weights. + val wlDataset = wl match { + case wlWeighted: HasWeightCol => + origData.zip(logInstanceWeights).map { case (lp: LabeledPoint, logWeight: Double) => + LabeledPoint(lp.label, lp.features, weight) + } + case _ => + // Subsample data to simulate the current instance weight distribution. + // TODO: This needs to be done before AdaBoost is committed. + throw new NotImplementedError( + "AdaBoost currently requires that the weak learning algorithm accept instance weights.") + } + // Train the weak learning algorithm. + val weakHypothesis: ClassificationModel[_] = wl match { + case wlTagged: WeakLearner[_] => + // This lets the weak learner know that the features are in its native format. + wlTagged.trainNative(wlDataset, wlParamMap).asInstanceOf[ClassificationModel[_]] + case _ => + wl.train(wlDataset, wlParamMap).asInstanceOf[ClassificationModel[_]] + } + // Add the weighted weak hypothesis to the ensemble. + // TODO: Handle instance weights. + val predictionsAndLabels = wlDataset.map(lp => weakHypothesis.predict(lp.features)) + .zip(wlDataset.map(_.label)) + val eps = ClassificationEvaluator.computeMetric(predictionsAndLabels, "accuracy") + val alpha = 0.5 * (math.log(1.0 - eps) - math.log(eps)) // TODO: handle eps near 0 + weakHypotheses += weakHypothesis + weakHypothesisWeights += alpha + // Update weights. + val newInstanceWeights = instanceWeights.zip(predictionsAndLabels).map { + case (weight: Double, (pred: Double, label: Double)) => + ??? + } + + }*/ + + override def currentModel: AdaBoostModel = { + new AdaBoostModel(weakHypotheses.toArray, weakHypothesisWeights.toArray, parent, paramMap) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala new file mode 100644 index 0000000000000..5d146f6724958 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.evaluation.ClassificationEvaluator +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.ml._ +import org.apache.spark.ml.impl.estimator.{HasDefaultEvaluator, PredictionModel, Predictor, + PredictorParams} +import org.apache.spark.rdd.RDD + +@AlphaComponent +private[classification] trait ClassifierParams extends PredictorParams + +/** + * Single-label binary or multiclass classification + */ +abstract class Classifier[Learner <: Classifier[Learner, M], M <: ClassificationModel[M]] + extends Predictor[Learner, M] + with ClassifierParams + with HasDefaultEvaluator { + + override def defaultEvaluator: Evaluator = new ClassificationEvaluator +} + + +private[ml] abstract class ClassificationModel[M <: ClassificationModel[M]] + extends PredictionModel[M] with ClassifierParams { + + def numClasses: Int + + /** + * Predict label for the given features. Labels are indexed {0, 1, ..., numClasses - 1}. + * This default implementation for classification predicts the index of the maximum value + * from [[predictRaw()]]. + */ + override def predict(features: Vector): Double = { + predictRaw(features).toArray.zipWithIndex.maxBy(_._1)._2 + } + + /** + * Raw prediction for each possible label + * @return vector where element i is the raw score for label i + */ + def predictRaw(features: Vector): Vector + + /** + * Compute this model's accuracy on the given dataset. + */ + def accuracy(dataset: RDD[LabeledPoint]): Double = { + // TODO: Handle instance weights. + val predictionsAndLabels = dataset.map(lp => predict(lp.features)) + .zip(dataset.map(_.label)) + ClassificationEvaluator.computeMetric(predictionsAndLabels, "accuracy") + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index b46a5cd8bdf29..19b384cf04316 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -18,11 +18,12 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.ml._ +import org.apache.spark.ml.LabeledPoint +import org.apache.spark.ml.impl.estimator.ProbabilisticClassificationModel import org.apache.spark.ml.param._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.linalg.{BLAS, Vector, VectorUDT} -import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.{Vectors, BLAS, Vector} +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.Dsl._ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} @@ -33,46 +34,31 @@ import org.apache.spark.storage.StorageLevel * Params for logistic regression. */ @AlphaComponent -private[classification] trait LogisticRegressionParams extends Params - with HasRegParam with HasMaxIter with HasLabelCol with HasThreshold with HasFeaturesCol - with HasScoreCol with HasPredictionCol { +private[classification] trait LogisticRegressionParams extends ClassifierParams + with HasRegParam with HasMaxIter with HasThreshold with HasScoreCol { - /** - * Validates and transforms the input schema with the provided param map. - * @param schema input schema - * @param paramMap additional parameters - * @param fitting whether this is in fitting - * @return output schema - */ - protected def validateAndTransformSchema( + override protected def validateAndTransformSchema( schema: StructType, paramMap: ParamMap, fitting: Boolean): StructType = { + val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting) val map = this.paramMap ++ paramMap - val featuresType = schema(map(featuresCol)).dataType - // TODO: Support casting Array[Double] and Array[Float] to Vector. - require(featuresType.isInstanceOf[VectorUDT], - s"Features column ${map(featuresCol)} must be a vector column but got $featuresType.") - if (fitting) { - val labelType = schema(map(labelCol)).dataType - require(labelType == DoubleType, - s"Cannot convert label column ${map(labelCol)} of type $labelType to a double column.") - } - val fieldNames = schema.fieldNames + val fieldNames = parentSchema.fieldNames require(!fieldNames.contains(map(scoreCol)), s"Score column ${map(scoreCol)} already exists.") - require(!fieldNames.contains(map(predictionCol)), - s"Prediction column ${map(predictionCol)} already exists.") - val outputFields = schema.fields ++ Seq( - StructField(map(scoreCol), DoubleType, false), - StructField(map(predictionCol), DoubleType, false)) + val outputFields = parentSchema.fields ++ Seq( + StructField(map(scoreCol), DoubleType, nullable = false)) StructType(outputFields) } } + /** * Logistic regression. */ -class LogisticRegression extends Estimator[LogisticRegressionModel] with LogisticRegressionParams { +class LogisticRegression extends Classifier[LogisticRegression, LogisticRegressionModel] + with LogisticRegressionParams { + + // TODO: Extend IterativeEstimator setRegParam(0.1) setMaxIter(100) @@ -80,35 +66,31 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - def setLabelCol(value: String): this.type = set(labelCol, value) def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) - override def fit(dataset: DataFrame, paramMap: ParamMap): LogisticRegressionModel = { - transformSchema(dataset.schema, paramMap, logging = true) - val map = this.paramMap ++ paramMap - val instances = dataset.select(map(labelCol), map(featuresCol)) - .map { case Row(label: Double, features: Vector) => - LabeledPoint(label, features) - }.persist(StorageLevel.MEMORY_AND_DISK) + def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): LogisticRegressionModel = { + val oldDataset = dataset.map { case LabeledPoint(label: Double, features: Vector, weight) => + org.apache.spark.mllib.regression.LabeledPoint(label, features) + } + val handlePersistence = oldDataset.getStorageLevel == StorageLevel.NONE + if (handlePersistence) { + oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + } val lr = new LogisticRegressionWithLBFGS lr.optimizer - .setRegParam(map(regParam)) - .setNumIterations(map(maxIter)) - val lrm = new LogisticRegressionModel(this, map, lr.run(instances).weights) - instances.unpersist() - // copy model params - Params.inheritValues(map, this, lrm) + .setRegParam(paramMap(regParam)) + .setNumIterations(paramMap(maxIter)) + val model = lr.run(oldDataset) + val lrm = new LogisticRegressionModel(this, paramMap, model.weights, model.intercept) + if (handlePersistence) { + oldDataset.unpersist() + } lrm } - - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap, fitting = true) - } } + /** * :: AlphaComponent :: * Model produced by [[LogisticRegression]]. @@ -117,16 +99,22 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti class LogisticRegressionModel private[ml] ( override val parent: LogisticRegression, override val fittingParamMap: ParamMap, - weights: Vector) - extends Model[LogisticRegressionModel] with LogisticRegressionParams { + val weights: Vector, + val intercept: Double) + extends ClassificationModel[LogisticRegressionModel] + with ProbabilisticClassificationModel + with LogisticRegressionParams { def setThreshold(value: Double): this.type = set(threshold, value) - def setFeaturesCol(value: String): this.type = set(featuresCol, value) def setScoreCol(value: String): this.type = set(scoreCol, value) - def setPredictionCol(value: String): this.type = set(predictionCol, value) - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap, fitting = false) + private val margin: Vector => Double = (features) => { + BLAS.dot(features, weights) + intercept + } + + private val score: Vector => Double = (features) => { + val m = margin(features) + 1.0 / (1.0 + math.exp(-m)) } override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { @@ -144,4 +132,24 @@ class LogisticRegressionModel private[ml] ( .select($"*", scoreFunction(col(map(featuresCol))).as(map(scoreCol))) .select($"*", predictFunction(col(map(scoreCol))).as(map(predictionCol))) } + + override val numClasses: Int = 2 + + /** + * Predict label for the given feature vector. + * The behavior of this can be adjusted using [[threshold]]. + */ + override def predict(features: Vector): Double = { + if (score(features) > paramMap(threshold)) 1 else 0 + } + + override def predictProbabilities(features: Vector): Vector = { + val s = score(features) + Vectors.dense(Array(1.0 - s, s)) + } + + override def predictRaw(features: Vector): Vector = { + val m = margin(m) + Vectors.dense(Array(-m, m)) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala new file mode 100644 index 0000000000000..fb88bb387d15a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -0,0 +1,67 @@ +package org.apache.spark.ml.classification + +import org.apache.spark.ml.LabeledPoint +import org.apache.spark.ml.param.{HasSmoothingParam, ParamMap} +import org.apache.spark.mllib.linalg.{BLAS, DenseVector, Matrices, Matrix, Vector, Vectors} +import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes} +import org.apache.spark.rdd.RDD + + +private[classification] trait NaiveBayesParams extends ClassifierParams with HasSmoothingParam { + // TODO: override validateAndTransformSchema to check smoothingParam validity +} + +class NaiveBayes extends Classifier[NaiveBayes, NaiveBayesModel] with NaiveBayesParams { + + setSmoothingParam(1.0) + + def setSmoothingParam(value: Double): this.type = set(smoothingParam, value) + + override def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): NaiveBayesModel = { + val oldDataset = dataset.map { case LabeledPoint(label: Double, features: Vector, weight) => + org.apache.spark.mllib.regression.LabeledPoint(label, features) + } + val nb = OldNaiveBayes.train(oldDataset, paramMap(smoothingParam)) + val numClasses = nb.theta.size + val numFeatures = nb.theta(0).size + // Arrange theta into column-major format. + val thetaData = new Array[Double](numClasses * numFeatures) + var j = 0 + var k = 0 // index into thetaData + while (j < numFeatures) { + var i = 0 + while (i < numClasses) { + thetaData(k) = nb.theta(i)(j) + i += 1 + k += 1 + } + j += 1 + } + val theta: Matrix = Matrices.dense(numClasses, numFeatures, thetaData) + new NaiveBayesModel(this, paramMap, new DenseVector(nb.pi), theta) + } +} + +// TODO: Extend ProbabilisticClassificationModel +// TODO: I removed 'labels' since that functionality should be in Classifier. +/** + * @param pi log of class priors, whose dimension is C, number of labels + * @param theta log of class conditional probabilities, whose dimension is C-by-D, + * where D is number of features + */ +class NaiveBayesModel private[ml] ( + override val parent: NaiveBayes, + override val fittingParamMap: ParamMap, + val pi: DenseVector, + val theta: Matrix) + extends ClassificationModel[NaiveBayesModel] with NaiveBayesParams { + + override val numClasses: Int = pi.size + + override def predictRaw(features: Vector): Vector = { + // TODO: Generalize BLAS.gemv to take Vector (not just DenseVector). + val pred = theta.multiply(new DenseVector(features.toArray)) + BLAS.axpy(1.0, pi, pred) + pred + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 1979ab9eb6516..8b51d9d387983 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -21,6 +21,7 @@ import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml._ import org.apache.spark.ml.param._ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.DoubleType @@ -56,8 +57,16 @@ class BinaryClassificationEvaluator extends Evaluator with Params .map { case Row(score: Double, label: Double) => (score, label) } + BinaryClassificationEvaluator.computeMetric(scoreAndLabels, map(metricName)) + } + +} + +private[ml] object BinaryClassificationEvaluator { + + def computeMetric(scoreAndLabels: RDD[(Double, Double)], metricName: String): Double = { val metrics = new BinaryClassificationMetrics(scoreAndLabels) - val metric = map(metricName) match { + val metric = metricName match { case "areaUnderROC" => metrics.areaUnderROC() case "areaUnderPR" => @@ -68,4 +77,5 @@ class BinaryClassificationEvaluator extends Evaluator with Params metrics.unpersist() metric } + } diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClassificationEvaluator.scala new file mode 100644 index 0000000000000..be204832cf514 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClassificationEvaluator.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.evaluation.impl.PredictionEvaluator +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.evaluation.MulticlassMetrics +import org.apache.spark.rdd.RDD + +/** + * :: AlphaComponent :: + * Evaluator for single-label multiclass classification, + * which expects two input columns: prediction and label. + */ +@AlphaComponent +class ClassificationEvaluator extends PredictionEvaluator { + + /** param for metric name in evaluation */ + val metricName: Param[String] = new Param(this, "metricName", + "metric name in evaluation (accuracy)", Some("accuracy")) + def getMetricName: String = get(metricName) + def setMetricName(value: String): this.type = set(metricName, value) + + protected override def evaluateImpl(predictionsAndLabels: RDD[(Double, Double)]): Double = { + val map = this.paramMap ++ paramMap + ClassificationEvaluator.computeMetric(predictionsAndLabels, map(metricName)) + } +} + +private[ml] object ClassificationEvaluator { + + def computeMetric(predictionsAndLabels: RDD[(Double, Double)], metricName: String): Double = { + val metrics = new MulticlassMetrics(predictionsAndLabels) + metricName match { + case "accuracy" => + metrics.precision + case other => + throw new IllegalArgumentException(s"Does not support metric $other.") + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala new file mode 100644 index 0000000000000..b82ad4c0d02b3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.evaluation.impl.PredictionEvaluator +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.evaluation.RegressionMetrics +import org.apache.spark.rdd.RDD + +/** + * :: AlphaComponent :: + * Evaluator for single-label regression, + * which expects two input columns: prediction and label. + */ +@AlphaComponent +class RegressionEvaluator extends PredictionEvaluator { + + /** param for metric name in evaluation */ + val metricName: Param[String] = new Param(this, "metricName", + "metric name in evaluation (RMSE)", Some("RMSE")) + def getMetricName: String = get(metricName) + def setMetricName(value: String): this.type = set(metricName, value) + + protected override def evaluateImpl(predictionsAndLabels: RDD[(Double, Double)]): Double = { + val map = this.paramMap ++ paramMap + RegressionEvaluator.computeMetric(predictionsAndLabels, map(metricName)) + } +} + +private[ml] object RegressionEvaluator { + + def computeMetric(predictionsAndLabels: RDD[(Double, Double)], metricName: String): Double = { + val metrics = new RegressionMetrics(predictionsAndLabels) + metricName match { + case "RMSE" => + metrics.rootMeanSquaredError + case other => + throw new IllegalArgumentException(s"Does not support metric $other.") + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/impl/PredictionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/impl/PredictionEvaluator.scala new file mode 100644 index 0000000000000..bda5351ea672f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/impl/PredictionEvaluator.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation.impl + +import org.apache.spark.ml._ +import org.apache.spark.ml.param._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DoubleType, Row, SchemaRDD} + +/** + * Evaluator for single-label prediction problems, + * which expects two input columns: prediction and label. + */ +private[ml] abstract class PredictionEvaluator extends Evaluator with Params + with HasPredictionCol with HasLabelCol { + + def setPredictionCol(value: String): this.type = set(predictionCol, value) + def setLabelCol(value: String): this.type = set(labelCol, value) + + override def evaluate(dataset: SchemaRDD, paramMap: ParamMap): Double = { + val map = this.paramMap ++ paramMap + + val schema = dataset.schema + val predictionType = schema(map(predictionCol)).dataType + require(predictionType == DoubleType, + s"Prediction column ${map(predictionCol)} must be double type but found $predictionType") + val labelType = schema(map(labelCol)).dataType + require(labelType == DoubleType, + s"Label column ${map(labelCol)} must be double type but found $labelType") + + import dataset.sqlContext._ + val predictionsAndLabels = dataset.select(map(predictionCol).attr, map(labelCol).attr) + .map { case Row(prediction: Double, label: Double) => + (prediction, label) + } + + evaluateImpl(predictionsAndLabels) + } + + /** + * Developers can implement this method for evaluators taking (prediction, label) tuples + */ + protected def evaluateImpl(predictionsAndLabels: RDD[(Double, Double)]): Double +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/HasDefaultEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/HasDefaultEvaluator.scala new file mode 100644 index 0000000000000..af57e5e4a2a3e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/HasDefaultEvaluator.scala @@ -0,0 +1,12 @@ +package org.apache.spark.ml.impl.estimator + +import org.apache.spark.ml.Evaluator + +trait HasDefaultEvaluator { + + /** + * Default evaluation metric usable for model validation (e.g., with CrossValidator). + */ + def defaultEvaluator: Evaluator + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/IterativeEstimator.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/IterativeEstimator.scala new file mode 100644 index 0000000000000..484017b786fbd --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/IterativeEstimator.scala @@ -0,0 +1,18 @@ +package org.apache.spark.ml.impl.estimator + +import org.apache.spark.sql.SchemaRDD + +import org.apache.spark.ml.Model +import org.apache.spark.ml.param.ParamMap + +/** + * Trait for an iterative estimator which permits access to its underlying iterative optimization + * algorithm. + * Classes implementing this trait can create an [[IterativeSolver]], which operates on a static + * dataset using an iterative algorithm. + */ +trait IterativeEstimator[M <: Model[M]] { + + private[ml] def createSolver(dataset: SchemaRDD, paramMap: ParamMap): IterativeSolver[M] + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/IterativeSolver.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/IterativeSolver.scala new file mode 100644 index 0000000000000..b3c2c987da912 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/IterativeSolver.scala @@ -0,0 +1,54 @@ +package org.apache.spark.ml.impl.estimator + +import org.apache.spark.ml.Model + +/** + * Iterative stateful solver for an [[IterativeEstimator]]. + * This type of estimator holds state (a fixed dataset and a model) and allows for iterative + * optimization. + * + * This type is distinct from an Optimizer in that an Optimizer has no concept of + * a [[Model]]; an [[IterativeSolver]] can produce a [[Model]] at any time. + * + * This type is not an [[org.apache.spark.ml.Estimator]], but it can produce a model. + * + * Notes to developers: + * - This class allows an algorithm such as LinearRegression to produce an IterativeSolver + * even when the underlying optimization is non-iterative (such as matrix inversion). + * In that case, the step() method can be called once, after which it will do nothing. + */ +abstract class IterativeSolver[M <: Model[M]] { + + protected var currentIteration: Int = 0 + + /** + * Run one step (iteration) of learning. + * @return True if the step completed. + * False if learning had already finished, or if the step failed. + */ + def step(): Boolean = { + // This handles the iteration limit. Developers can implement stepImpl(). + if (currentIteration >= maxIterations) return false + val res = stepImpl() + if (res) currentIteration += 1 + res + } + + /** + * Same as step, except that it ignores the iteration limit. + */ + protected def stepImpl(): Boolean + + /** + * Get the current model. + */ + def currentModel: M + + def maxIterations: Int + + /** + * Number of iterations completed so far. + * If [[step()]] returns false, this count will not be incremented. + */ + def numIterations: Int = currentIteration +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala new file mode 100644 index 0000000000000..89e82c36dd52e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala @@ -0,0 +1,135 @@ +package org.apache.spark.ml.impl.estimator + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.{Estimator, LabeledPoint, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Star + +@AlphaComponent +private[ml] trait PredictorParams extends Params + with HasLabelCol with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema with the provided param map. + * @param schema input schema + * @param paramMap additional parameters + * @param fitting whether this is in fitting + * @return output schema + */ + protected def validateAndTransformSchema( + schema: StructType, + paramMap: ParamMap, + fitting: Boolean): StructType = { + val map = this.paramMap ++ paramMap + val featuresType = schema(map(featuresCol)).dataType + // TODO: Support casting Array[Double] and Array[Float] to Vector. + require(featuresType.isInstanceOf[VectorUDT], + s"Features column ${map(featuresCol)} must be Vector types" + + s" but was actually $featuresType.") + if (fitting) { + val labelType = schema(map(labelCol)).dataType + require(labelType == DoubleType || labelType == IntegerType, + s"Cannot convert label column ${map(labelCol)} of type $labelType to a Double column.") + } + val fieldNames = schema.fieldNames + require(!fieldNames.contains(map(predictionCol)), + s"Prediction column ${map(predictionCol)} already exists.") + val outputFields = schema.fields ++ Seq( + StructField(map(predictionCol), DoubleType, nullable = false)) + StructType(outputFields) + } +} + +private[ml] abstract class Predictor[Learner <: Predictor[Learner, M], M <: PredictionModel[M]] + extends Estimator[M] with PredictorParams { + + // TODO: Eliminate asInstanceOf and see if that works. + def setLabelCol(value: String): Learner = set(labelCol, value).asInstanceOf[Learner] + def setFeaturesCol(value: String): Learner = set(featuresCol, value).asInstanceOf[Learner] + def setPredictionCol(value: String): Learner = set(predictionCol, value).asInstanceOf[Learner] + + protected def selectLabelColumn(dataset: SchemaRDD, paramMap: ParamMap): RDD[Double] = { + import dataset.sqlContext._ + val map = this.paramMap ++ paramMap + dataset.select(map(labelCol).attr).map { + case Row(label: Double) => label + case Row(label: Int) => label.toDouble + } + } + + private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap, fitting = true) + } + + override def fit(dataset: SchemaRDD, paramMap: ParamMap): M = { + transformSchema(dataset.schema, paramMap, logging = true) + import dataset.sqlContext._ + val map = this.paramMap ++ paramMap + val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) + .map { case Row(label: Double, features: Vector) => + LabeledPoint(label, features) + } + val model = train(instances, map) + // copy model params + Params.inheritValues(map, this, model) + model + } + + /** + * Notes to developers: + * - Unlike [[fit()]], this method takes [[paramMap]] which has already been + * combined with the internal paramMap. + * - This should handle caching the dataset if needed. + * @param dataset Training data + * @param paramMap Parameters for training. + */ + def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): M +} + +private[ml] abstract class PredictionModel[M <: PredictionModel[M]] + extends Model[M] with PredictorParams { + + def setFeaturesCol(value: String): M = set(featuresCol, value).asInstanceOf[M] + + def setPredictionCol(value: String): M = set(predictionCol, value).asInstanceOf[M] + + private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap, fitting = false) + } + + /** + * Transforms dataset by reading from [[featuresCol]], calling [[predict( )]], and storing + * the predictions as a new column [[predictionCol]]. + * This default implementation should be overridden as needed. + * @param dataset input dataset + * @param paramMap additional parameters, overwrite embedded params + * @return transformed dataset with [[predictionCol]] of type [[Double]] + */ + override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + import org.apache.spark.sql.catalyst.dsl._ + import dataset.sqlContext._ + + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + val pred: Vector => Double = (features) => { + predict(features) + } + dataset.select(Star(None), pred.call(map(featuresCol).attr) as map(predictionCol)) + } + + /** + * Default implementation. + * Override for efficiency; e.g., this does not broadcast the model. + */ + def predict(dataset: RDD[Vector]): RDD[Double] = { + dataset.map(predict) + } + + /** + * Predict label for the given features. + */ + def predict(features: Vector): Double +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/ProbabilisticClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/ProbabilisticClassificationModel.scala new file mode 100644 index 0000000000000..a44d4c10a78a4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/ProbabilisticClassificationModel.scala @@ -0,0 +1,12 @@ +package org.apache.spark.ml.impl.estimator + +import org.apache.spark.mllib.linalg.Vector + +private[ml] trait ProbabilisticClassificationModel { + + /** + * Predict the probability of each label. + */ + def predictProbabilities(features: Vector): Vector + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/WeakLearner.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/WeakLearner.scala new file mode 100644 index 0000000000000..e9e7628f20f22 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/WeakLearner.scala @@ -0,0 +1,19 @@ +package org.apache.spark.ml.impl.estimator + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.ml.LabeledPoint +import org.apache.spark.ml.param.ParamMap + +/** + * Trait indicating that this algorithm is optimized for being used as a weak learner for boosting, + * bagging, and other meta-algorithms. + */ +trait WeakLearner[M <: PredictionModel[M]] { + + def getNativeFeatureRDD(dataset: SchemaRDD, paramMap: ParamMap): RDD[Vector] + + def trainNative(dataset: RDD[LabeledPoint], paramMap: ParamMap): M + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala index ef141d3eb2b06..0e30377d7285a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala @@ -23,6 +23,13 @@ private[ml] trait HasRegParam extends Params { def getRegParam: Double = get(regParam) } +private[ml] trait HasSmoothingParam extends Params { + /** param for smoothing parameter */ + val smoothingParam: DoubleParam = + new DoubleParam(this, "smoothingParam", "smoothing parameter") + def getSmoothingParam: Double = get(smoothingParam) +} + private[ml] trait HasMaxIter extends Params { /** param for max number of iterations */ val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations") @@ -72,3 +79,9 @@ private[ml] trait HasOutputCol extends Params { val outputCol: Param[String] = new Param(this, "outputCol", "output column name") def getOutputCol: String = get(outputCol) } + +private[ml] trait HasWeightCol extends Params { + /** param for instance weight column name */ + val weightCol: Param[String] = new Param(this, "weightCol", "instance weight column name") + def getWeightCol: String = get(weightCol) +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala new file mode 100644 index 0000000000000..8a6af90857dd1 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -0,0 +1,72 @@ +package org.apache.spark.ml.regression + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.LabeledPoint +import org.apache.spark.ml.param.{ParamMap, HasMaxIter, HasRegParam} +import org.apache.spark.mllib.linalg.{BLAS, Vector} +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +/** + * :: AlphaComponent :: + * Params for linear regression. + */ +@AlphaComponent +private[regression] trait LinearRegressionParams extends RegressorParams + with HasRegParam with HasMaxIter + + +/** + * Logistic regression. + */ +class LinearRegression extends Regressor[LinearRegression, LinearRegressionModel] + with LinearRegressionParams { + + // TODO: Extend IterativeEstimator + + setRegParam(0.1) + setMaxIter(100) + + def setRegParam(value: Double): this.type = set(regParam, value) + def setMaxIter(value: Int): this.type = set(maxIter, value) + + def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): LinearRegressionModel = { + val oldDataset = dataset.map { case LabeledPoint(label: Double, features: Vector, weight) => + org.apache.spark.mllib.regression.LabeledPoint(label, features) + } + val handlePersistence = oldDataset.getStorageLevel == StorageLevel.NONE + if (handlePersistence) { + oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + } + val lr = new LinearRegressionWithSGD() + lr.optimizer + .setRegParam(paramMap(regParam)) + .setNumIterations(paramMap(maxIter)) + val model = lr.run(oldDataset) + val lrm = new LinearRegressionModel(this, paramMap, model.weights, model.intercept) + if (handlePersistence) { + oldDataset.unpersist() + } + lrm + } +} + + +/** + * :: AlphaComponent :: + * Model produced by [[LinearRegression]]. + */ +@AlphaComponent +class LinearRegressionModel private[ml] ( + override val parent: LinearRegression, + override val fittingParamMap: ParamMap, + val weights: Vector, + val intercept: Double) + extends RegressionModel[LinearRegressionModel] + with LinearRegressionParams { + + override def predict(features: Vector): Double = { + BLAS.dot(features, weights) + intercept + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala new file mode 100644 index 0000000000000..d2009d8610a1c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala @@ -0,0 +1,33 @@ +package org.apache.spark.ml.regression + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.Evaluator +import org.apache.spark.ml.evaluation.RegressionEvaluator +import org.apache.spark.ml.impl.estimator.{PredictionModel, HasDefaultEvaluator, Predictor, + PredictorParams} +import org.apache.spark.mllib.linalg.Vector + +@AlphaComponent +private[regression] trait RegressorParams extends PredictorParams + +/** + * Single-label regression + */ +abstract class Regressor[Learner <: Regressor[Learner, M], M <: RegressionModel[M]] + extends Predictor[Learner, M] + with RegressorParams + with HasDefaultEvaluator { + + override def defaultEvaluator: Evaluator = new RegressionEvaluator +} + + +private[ml] abstract class RegressionModel[M <: RegressionModel[M]] + extends PredictionModel[M] with RegressorParams { + + /** + * Predict real-valued label for the given features. + */ + def predict(features: Vector): Double + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 5d51c51346665..ea4301e6467c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -22,6 +22,7 @@ import com.github.fommil.netlib.F2jBLAS import org.apache.spark.Logging import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml._ +import org.apache.spark.ml.impl.estimator.HasDefaultEvaluator import org.apache.spark.ml.param.{IntParam, Param, ParamMap, Params} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.sql.DataFrame @@ -70,7 +71,18 @@ class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorP transformSchema(dataset.schema, paramMap, logging = true) val sqlCtx = dataset.sqlContext val est = map(estimator) - val eval = map(evaluator) + val eval = if (map.contains(evaluator)) { + map(evaluator) + } else { + est match { + case e: HasDefaultEvaluator => + e.defaultEvaluator + case _ => + throw new IllegalArgumentException("CrossValidator could not find an evaluator to use." + + s" The estimator $est does not have a defaultEvaluator, so you must specify the" + + s" evaluator parameter for CrossValidator.") + } + } val epm = map(estimatorParamMaps) val numModels = epm.size val metrics = new Array[Double](epm.size)