Skip to content

Commit

Permalink
doc changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Nov 10, 2014
1 parent 73a000b commit 27582a4
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.examples.ml

import org.apache.spark.sql.{StringType, DataType, SQLContext}

import scala.beans.BeanInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.{Pipeline, UnaryTransformer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.SQLContext

@BeanInfo
case class LabeledDocument(id: Long, text: String, label: Double)
Expand All @@ -35,10 +36,15 @@ case class Document(id: Long, text: String)
/**
* A tokenizer that converts the input string to lowercase and then splits it by white spaces.
*/
class MyTokenizer extends UnaryTransformer[String, Seq[String], MyTokenizer]
with Serializable {
override def createTransformFunc(paramMap: ParamMap): String => Seq[String] =
class MyTokenizer extends UnaryTransformer[String, Seq[String], MyTokenizer] {

override def createTransformFunc(paramMap: ParamMap): String => Seq[String] = {
_.toLowerCase.split("\\s")
}

override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType, s"Input type must be string type but got $inputType.")
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Evaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.SchemaRDD
abstract class Evaluator extends Identifiable {

/**
* Evaluate the output
* Evaluates the output.
*
* @param dataset a dataset that contains labels/observations and predictions.
* @param paramMap parameter map that specifies the input columns and output metrics
* @return metric
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Identifiable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import java.util.UUID
trait Identifiable extends Serializable {

/**
* A unique id for the object.
* A unique id for the object. The default implementation concatenates the class name, "-", and 8
* random hex chars.
*/
val uid: String = this.getClass.getSimpleName + "-" + UUID.randomUUID().toString.take(8)
}
23 changes: 20 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,33 @@ package org.apache.spark.ml

import scala.collection.mutable.ListBuffer

import org.apache.spark.Logging
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.sql.{StructType, SchemaRDD}
import org.apache.spark.sql.{SchemaRDD, StructType}

/**
* A stage in a pipeline, either an Estimator or an Transformer.
*/
abstract class PipelineStage {
abstract class PipelineStage extends Serializable with Logging {

/**
* Derives the output schema from the input schema and parameters.
*/
def transform(schema: StructType, paramMap: ParamMap): StructType

/**
* Drives the output schema from the input schema and parameters, optionally with logging.
*/
protected def transform(schema: StructType, paramMap: ParamMap, logging: Boolean): StructType = {
if (logging) {
logDebug(s"Input schema: ${schema.json}")
}
val outputSchema = transform(schema, paramMap)
if (logging) {
logDebug(s"Expected output schema: ${outputSchema.json}")
}
outputSchema
}
}

/**
Expand All @@ -43,6 +58,7 @@ class Pipeline extends Estimator[PipelineModel] {
def getStages: Array[PipelineStage] = get(stages)

override def fit(dataset: SchemaRDD, paramMap: ParamMap): PipelineModel = {
transform(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val theStages = map(stages)
// Search for the last estimator.
Expand Down Expand Up @@ -89,7 +105,7 @@ class Pipeline extends Estimator[PipelineModel] {
class PipelineModel(
override val parent: Pipeline,
override val fittingParamMap: ParamMap,
val transformers: Array[Transformer]) extends Model {
val transformers: Array[Transformer]) extends Model with Logging {

/**
* Gets the model produced by the input estimator. Throws an NoSuchElementException is the input
Expand All @@ -110,6 +126,7 @@ class PipelineModel(
}

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
transform(dataset.schema, paramMap, logging = true)
transformers.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, paramMap))
}

Expand Down
15 changes: 10 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.ml

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.types.{StructField, StructType}

import scala.annotation.varargs
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.Logging
import org.apache.spark.ml.param._
import org.apache.spark.sql.{DataType, SchemaRDD}
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.catalyst.types._

/**
* Abstract class for transformers that transform one dataset into another.
Expand Down Expand Up @@ -72,7 +72,7 @@ abstract class Transformer extends PipelineStage with Params {
* result as a new column.
*/
abstract class UnaryTransformer[IN, OUT: TypeTag, SELF <: UnaryTransformer[IN, OUT, SELF]]
extends Transformer with HasInputCol with HasOutputCol {
extends Transformer with HasInputCol with HasOutputCol with Logging {

def setInputCol(value: String): SELF = { set(inputCol, value); this.asInstanceOf[SELF] }
def setOutputCol(value: String): SELF = { set(outputCol, value); this.asInstanceOf[SELF] }
Expand Down Expand Up @@ -103,6 +103,11 @@ abstract class UnaryTransformer[IN, OUT: TypeTag, SELF <: UnaryTransformer[IN, O
}

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
transform(dataset.schema, paramMap, logging = true)
val inputSchema = dataset.schema
logDebug(s"Input schema: ${inputSchema.json}")
val outputSchema = transform(dataset.schema, paramMap)
logDebug(s"Expected output schema: ${outputSchema.json}")
import dataset.sqlContext._
val map = this.paramMap ++ paramMap
val udf = this.createTransformFunc(map)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@ package org.apache.spark.ml.classification
import org.apache.spark.ml._
import org.apache.spark.ml.param._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector}
import org.apache.spark.mllib.linalg.{BLAS, Vector, VectorUDT}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.catalyst.expressions.{Cast, Row}
import org.apache.spark.storage.StorageLevel

/**
* Params for logistic regression.
*/
trait LogisticRegressionParams extends Params with HasRegParam with HasMaxIter with HasLabelCol
with HasThreshold with HasFeaturesCol with HasScoreCol with HasPredictionCol
private[classification] trait LogisticRegressionParams extends Params
with HasRegParam with HasMaxIter with HasLabelCol with HasThreshold with HasFeaturesCol
with HasScoreCol with HasPredictionCol

/**
* Logistic regression.
Expand All @@ -53,9 +52,10 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti
def setPredictionCol(value: String): this.type = { set(predictionCol, value); this }

override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = {
transform(dataset.schema, paramMap, logging = true)
import dataset.sqlContext._
val map = this.paramMap ++ paramMap
val instances = dataset.select(Cast(map(labelCol).attr, DoubleType), map(featuresCol).attr)
val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr)
.map { case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)
}.persist(StorageLevel.MEMORY_AND_DISK)
Expand All @@ -74,23 +74,15 @@ class LogisticRegression extends Estimator[LogisticRegressionModel] with Logisti
val map = this.paramMap ++ paramMap
val featuresType = schema(map(featuresCol)).dataType
// TODO: Support casting Array[Double] and Array[Float] to Vector.
if (!featuresType.isInstanceOf[VectorUDT]) {
throw new IllegalArgumentException(
s"Features column ${map(featuresCol)} must be a vector column but got $featuresType.")
}
val validLabelTypes = Set[DataType](FloatType, DoubleType, IntegerType, BooleanType, LongType)
require(featuresType.isInstanceOf[VectorUDT],
s"Features column ${map(featuresCol)} must be a vector column but got $featuresType.")
val labelType = schema(map(labelCol)).dataType
if (!validLabelTypes.contains(labelType)) {
throw new IllegalArgumentException(
s"Cannot convert label column ${map(labelCol)} of type $labelType to a double column.")
}
require(labelType == DoubleType,
s"Cannot convert label column ${map(labelCol)} of type $labelType to a double column.")
val fieldNames = schema.fieldNames
if (fieldNames.contains(map(scoreCol))) {
throw new IllegalArgumentException(s"Score column ${map(scoreCol)} already exists.")
}
if (fieldNames.contains(map(predictionCol))) {
throw new IllegalArgumentException(s"Prediction column ${map(predictionCol)} already exists.")
}
require(!fieldNames.contains(map(scoreCol)), s"Score column ${map(scoreCol)} already exists.")
require(!fieldNames.contains(map(predictionCol)),
s"Prediction column ${map(predictionCol)} already exists.")
val outputFields = schema.fields ++ Seq(
StructField(map(scoreCol), DoubleType, false),
StructField(map(predictionCol), DoubleType, false))
Expand All @@ -115,24 +107,20 @@ class LogisticRegressionModel private[ml] (
val map = this.paramMap ++ paramMap
val featuresType = schema(map(featuresCol)).dataType
// TODO: Support casting Array[Double] and Array[Float] to Vector.
if (!featuresType.isInstanceOf[VectorUDT]) {
throw new IllegalArgumentException(
s"Features column ${map(featuresCol)} must be a vector column but got $featuresType.")
}
require(featuresType.isInstanceOf[VectorUDT],
s"Features column ${map(featuresCol)} must be a vector column but got $featuresType.")
val fieldNames = schema.fieldNames
if (fieldNames.contains(map(scoreCol))) {
throw new IllegalArgumentException(s"Score column ${map(scoreCol)} already exists.")
}
if (fieldNames.contains(map(predictionCol))) {
throw new IllegalArgumentException(s"Prediction column ${map(predictionCol)} already exists.")
}
require(!fieldNames.contains(map(scoreCol)), s"Score column ${map(scoreCol)} already exists.")
require(!fieldNames.contains(map(predictionCol)),
s"Prediction column ${map(predictionCol)} already exists.")
val outputFields = schema.fields ++ Seq(
StructField(map(scoreCol), DoubleType, false),
StructField(map(predictionCol), DoubleType, false))
StructType(outputFields)
}

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
transform(dataset.schema, paramMap, logging = true)
import dataset.sqlContext._
val map = this.paramMap ++ paramMap
val score: Vector => Double = (v) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.ml.evaluation
import org.apache.spark.ml._
import org.apache.spark.ml.param._
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.{DoubleType, Row, SchemaRDD}
import org.apache.spark.storage.StorageLevel

/**
Expand All @@ -40,8 +39,17 @@ class BinaryClassificationEvaluator extends Evaluator with Params
def setLabelCol(value: String): this.type = { set(labelCol, value); this }

override def evaluate(dataset: SchemaRDD, paramMap: ParamMap): Double = {
import dataset.sqlContext._
val map = this.paramMap ++ paramMap

val schema = dataset.schema
val scoreType = schema(map(scoreCol)).dataType
require(scoreType == DoubleType,
s"Score column ${map(scoreCol)} must be double type but found $scoreType")
val labelType = schema(map(labelCol)).dataType
require(labelType == DoubleType,
s"Label column ${map(labelCol)} must be double type but found $labelType")

import dataset.sqlContext._
val scoreAndLabels = dataset.select(map(scoreCol).attr, map(labelCol).attr)
.map { case Row(score: Double, label: Double) =>
(score, label)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ package org.apache.spark.ml.feature
import org.apache.spark.ml._
import org.apache.spark.ml.param._
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.sql.catalyst.types.StructField
import org.apache.spark.sql.{StructType, SchemaRDD}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.catalyst.expressions.Row

/**
* Params for [[StandardScaler]] and [[StandardScalerModel]].
*/
trait StandardScalerParams extends Params with HasInputCol with HasOutputCol
private[feature] trait StandardScalerParams extends Params with HasInputCol with HasOutputCol

/**
* Standardizes features by removing the mean and scaling to unit variance using column summary
Expand All @@ -42,6 +40,7 @@ class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerP
def setOutputCol(value: String): this.type = { set(outputCol, value); this }

override def fit(dataset: SchemaRDD, paramMap: ParamMap): StandardScalerModel = {
transform(dataset.schema, paramMap, logging = true)
import dataset.sqlContext._
val map = this.paramMap ++ paramMap
val input = dataset.select(map(inputCol).attr)
Expand Down Expand Up @@ -78,6 +77,7 @@ class StandardScalerModel private[ml] (
def setOutputCol(value: String): this.type = { set(outputCol, value); this }

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
transform(dataset.schema, paramMap, logging = true)
import dataset.sqlContext._
val map = this.paramMap ++ paramMap
val scale: (Vector) => Vector = (v) => {
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

/**
* "org.apache.spark.ml" is an ALPHA component that adapts the new set of machine learning APIs.
* :: AlphaComponent ::
* This is an ALPHA component that adapts the new set of machine learning APIs.
*/
package object ml {
}
4 changes: 1 addition & 3 deletions mllib/src/main/scala/org/apache/spark/ml/param/params.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Param[T] (
/**
* Creates a param pair with the given value (for Java).
*/
def w(value: T): ParamPair[T] = ParamPair(this, value)
def w(value: T): ParamPair[T] = this -> value

/**
* Creates a param pair with the given value (for Scala).
Expand Down Expand Up @@ -303,5 +303,3 @@ object ParamMap {
new ParamMap().put(paramPairs: _*)
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.ml.param

// shared params

private[ml] trait HasRegParam extends Params {
/** param for regularization parameter */
val regParam: DoubleParam = new DoubleParam(this, "regParam", "regularization parameter")
Expand Down
Loading

0 comments on commit 27582a4

Please sign in to comment.