Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3530][MLLIB] pipeline and parameters with examples #3099

Closed
wants to merge 55 commits into from

Conversation

mengxr
Copy link
Contributor

@mengxr mengxr commented Nov 5, 2014

This PR adds package "org.apache.spark.ml" with pipeline and parameters, as discussed on the JIRA. This is a joint work of @jkbradley @etrain @shivaram and many others who helped on the design, also with help from @marmbrus and @liancheng on the Spark SQL side. The design doc can be found at:

https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o/edit?usp=sharing

org.apache.spark.ml

This is a new package with new set of ML APIs that address practical machine learning pipelines. (Sorry for taking so long!) It will be an alpha component, so this is definitely not something set in stone. The new set of APIs, inspired by the MLI project from AMPLab and scikit-learn, takes leverage on Spark SQL's schema support and execution plan optimization. It introduces the following components that help build a practical pipeline:

  1. Transformer, which transforms a dataset into another
  2. Estimator, which fits models to data, where models are transformers
  3. Evaluator, which evaluates model output and returns a scalar metric
  4. Pipeline, a simple pipeline that consists of transformers and estimators

Parameters could be supplied at fit/transform or embedded with components.

  1. Param: a strong-typed parameter key with self-contained doc
  2. ParamMap: a param -> value map
  3. Params: trait for components with parameters

For any component that implements Params, user can easily check the doc by calling explainParams:

> val lr = new LogisticRegression
> lr.explainParams
maxIter: max number of iterations (default: 100)
regParam: regularization constant (default: 0.1)
labelCol: label column name (default: label)
featuresCol: features column name (default: features)

or user can check individual param:

> lr.maxIter
maxIter: max number of iterations (default: 100)

Please start with the example code in test suites and under org.apache.spark.examples.ml, where I put several examples:

  1. run a simple logistic regression job
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(1.0)
    val model = lr.fit(dataset)
    model.transform(dataset, model.threshold -> 0.8) // overwrite threshold
      .select('label, 'score, 'prediction).collect()
      .foreach(println)
  1. run logistic regression with cross-validation and grid search using areaUnderROC (default) as the metric
    val lr = new LogisticRegression
    val lrParamMaps = new ParamGridBuilder()
      .addGrid(lr.regParam, Array(0.1, 100.0))
      .addGrid(lr.maxIter, Array(0, 5))
      .build()
    val eval = new BinaryClassificationEvaluator
    val cv = new CrossValidator()
      .setEstimator(lr)
      .setEstimatorParamMaps(lrParamMaps)
      .setEvaluator(eval)
      .setNumFolds(3)
    val bestModel = cv.fit(dataset)
  1. run a pipeline that consists of a standard scaler and a logistic regression component
    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
    val lr = new LogisticRegression()
      .setFeaturesCol(scaler.getOutputCol)
    val pipeline = new Pipeline()
      .setStages(Array(scaler, lr))
    val model = pipeline.fit(dataset)
    val predictions = model.transform(dataset)
      .select('label, 'score, 'prediction)
      .collect()
      .foreach(println)
  1. a simple text classification pipeline, which recognizes "spark":
    val training = sparkContext.parallelize(Seq(
      LabeledDocument(0L, "a b c d e spark", 1.0),
      LabeledDocument(1L, "b d", 0.0),
      LabeledDocument(2L, "spark f g h", 1.0),
      LabeledDocument(3L, "hadoop mapreduce", 0.0)))
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))
    val model = pipeline.fit(training)
    val test = sparkContext.parallelize(Seq(
      Document(4L, "spark i j k"),
      Document(5L, "l m"),
      Document(6L, "mapreduce spark"),
      Document(7L, "apache hadoop")))
    model.transform(test)
      .select('id, 'text, 'prediction, 'score)
      .collect()
      .foreach(println)

Java examples are very similar. I put example code that creates a simple text classification pipeline in Scala and Java, where a simple tokenizer is defined as a transformer outside org.apache.spark.ml.

What are missing now and will be added soon:

  1. Runtime check of schemas. So before we touch the data, we will go through the schema and make sure column names and types match the input parameters.
  2. Java examples.
  3. Store training parameters in trained models.
  4. (later) Serialization and Python API.

@SparkQA
Copy link

SparkQA commented Nov 5, 2014

Test build #22907 has started for PR 3099 at commit 376db0a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 5, 2014

Test build #22907 has finished for PR 3099 at commit 376db0a.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22907/
Test FAILed.

@@ -0,0 +1,6 @@
package org.apache.spark.ml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Missing copyright header)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @srowen, please ignore these. I'm not trying to let it pass Jenkins now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh Ok, is it worth me reviewing this yet? I'm liking it already.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 5, 2014

@srowen Glad to hear that you like it :) Your feedback will be greatly appreciated, but I don't want to waste your time on minor details. Let's run the discussion in the main thread, so we can still see them when I update the code. One thing I have been struggling with is the way we handle parameters here. Please check the last section of the PR description. Basically, I think

val lr = new LogisticRegression
  .setMaxIter(50)
  .setRegParam(0.1)

looks better than

val lr = new LogisticRegression
lr.set(lr.maxIter, 50)
  .set(lr.regParam, 0.1)

But if we use setters/getters, it is a little weird that lr.maxIter is a parameter key instead of its value. Or we can let lr.maxIter store the value (so the setters/getters can be generated using @BeanProperty we still need to write the code because setters from @BeanProperty doesn't return this), and then use something like lr.maxIter_ (underscore) as the parameter key. The code would become

val lr = new LogisticRegression
  .setRegParam(0.1) // attach regParam = 0.1 with the object "lr"
val lrParamMaps = new ParamMapBuilder()
  .addMulti(lr.maxIter_, Array(10, 20)) // specify parameters (with underscore) outside the object "lr"
  .build()
// multi-model training
val models = lr.fit(dataset, lrParamMaps)

Does it look better than the current version? Or we can just ask users to remember lr.maxIter is the parameter key, while lr.setMaxIter() sets and lr.getMaxIter() gets its value. I tend to like this better than the underscore solution.

@srowen
Copy link
Member

srowen commented Nov 5, 2014

@mengxr Yes I agree with your conclusion. With a generic get/set by key method, I suppose you don't strictly need all the particular getters and setters. I suppose that's simpler, and if there's just one way to do it, there's no confusion about key vs getter/setter. I don't know if you want to remove the getters/setters entirely, but it wouldn't be crazy.

@sryza
Copy link
Contributor

sryza commented Nov 5, 2014

If maxIter is a constant, would it be clearer to use MAX_ITER?

@mengxr
Copy link
Contributor Author

mengxr commented Nov 5, 2014

@sryza maxIter is not a constant. It is a parameter key in the current design.

@sryza
Copy link
Contributor

sryza commented Nov 5, 2014

Both the reference and the class internals are immutable, no? Typical Java conventions would put such a variable in all caps, though maybe in Scala it's different?

I suppose the fact that Param includes a reference to its parent makes this a grayer area. What's the thinking behind linking a Param to a specific instance? My (uninformed) gut is that they seem like general attributes / capabilities of a class.

/**
* Filter this param map for the given parent.
*/
def filter(parent: Identifiable): ParamMap = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - what would this filtering be used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful when we try to store training parameters in trained models.

@jkbradley
Copy link
Member

I vote for the lr.setRegParam(0.1) setup. I also vote for setting parameters beforehand, and not allowing parameters in fit().

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22962 has started for PR 3099 at commit 1ef26e0.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22962 has finished for PR 3099 at commit 1ef26e0.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22962/
Test FAILed.

@shivaram
Copy link
Contributor

shivaram commented Nov 6, 2014

@mengxr Thanks for putting this together ! I had some high level comments by looking at the code

  1. Could we have constructors also with getter, setters ? This is the same style we use in MLLib and it will make user's code much cleaner.
val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
    val lr = new LogisticRegression()
      .setFeaturesCol("scaledFeatures")

will become one liners like

val scaler = new StandardScaler(inputCol="features", outputCol="scaledFeatures")
val lr = new LogisticRegression(featuresCol="scaledFeatures")

Also we will still have an empty constructor that will just set the default values that exist for Params right now.

Also, FWIW I am not very sure about having setter, getters using traits like HasRegParam etc. This creates a lot of classes as we add more classes each having 2-3 params ?

  1. Passing params across modules: The code to pass params from Estimator to Transformer is a little clunky right now. I think this can also be solved with the constructor interface above.

For example in LogisticRegression.scala, we can have

val lrm = new LogisticRegressionModel(lr.run(instances).weights)
if (!lrm.paramMap.contains(lrm.featuresCol) && map.contains(lrm.featuresCol)) {
   lrm.setFeaturesCol(featuresCol)
}

can become

val lrm = new LogisticRegressionModel(lr.run(instances).weights, featureCol=featuresCol)
  1. In addition to the SchemaRDD based transformer API, it would be good to have a single Row transformation API too. If you want to reuse the same transformers for training and for online prediction you would want to use the same Pipeline, but with 1 element at a time or something like that ? There are some folks in the AMPLab (cc @jegonzal @dcrankshaw) who are working on model serving and this would be useful for integration with such systems.
    Also I can see the single element API being very useful for unit testing.
  2. Other than the API, the biggest worry I have is in terms of memory management. Because most of the transformations can be lazy operations, it is very tricky to figure out when the programmer should call persist / unpersist. For example in StandardScaler.scala, we now have input.cache(). But AFAIK there is no simple way to do unpersist this RDD if the Scaler is being run lazily.

Also if you don't mind I'll try to port some of the image processing pipelines we have to this class structure by forking your branch. I feel that'll help me figure out if what features are easy to use etc.

@manishamde
Copy link
Contributor

I have a few comments based upon the API:

  1. Like @jkbradley, I prefer lr.setMaxIter(50) over lr.set(lr.maxIter, 50). Also, prefer to avoid passing parameters to fit like lr.fit(dataset, lr.maxIter -> 50).
  2. Constructors with getters and setters as @shivaram pointed will be great. The LOC reduction is important and should not be discounted.
  3. Do we plan to provided syntactic sugar such as a predict method when we use model to transform a dataset? For me transform fits well with the feature engineering stage and predict after the model training has been performed.
  4. It will be great to see the corresponding examples in Python.The getter/setters would map well to Python properties. Also, it will be nice to do an apples-to-apples comparison with the scikit-learn pipeline.
  5. Finally, how do we plan to programatically answer (developer/user) queries about algorithm properties such as multiclass classification support, using internal storage format, etc.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 6, 2014

@shivaram Thanks for your feedback!

Could we have constructors also with getter, setters?

It would be hard to maintain binary compatibility in the long run. After we remove the @experimental tag, each time we add a new parameter, we need to create a new auxiliary constructor. It is also easier to deprecate parameters.

It will also make Scala code different from Java's, which we try to avoid. Using getters and setters, the Java code JavaLogisticRegressionSuite looks very similar to the Scala code LogisticRegressionSuite.

Also, FWIW I am not very sure about having setter, getters using traits like HasRegParam etc. This creates a lot of classes as we add more classes each having 2-3 params ?

I'm okay with either approach. I felt this is a nice feature of Scala. Some parameters are very common, e.g., regParam, maxIter, and featuresCol. It is tedious to copy the same code around. And we can group parameter traits into something like IterativeParams or RegressionModelParams later. However, when I coded this up, I found Java doesn't interpret the return type correctly. So I have to override the setters in each class, which is bad.

Passing params across modules.

The model parameter like featuresCol is set before training. So we still need to deal with the logic after training:

  1. If lr.model.featuresCol is set, use it.
  2. If lr.model.featuresCol is not set, use lr.featuresCol or keep the default if lr.featuresCol is not set either.

In addition to the SchemaRDD based transformer API, it would be good to have a single Row transformation API too.

I agree. We need to know the schema in order to transform individual instances. The row object doesn't have reference to its schema. We can add schema as a parameter, which is required to transform a row. Btw, we will keep the predict method that works on a single Vector.

Other than the API, the biggest worry I have is in terms of memory management.

We can call unpersist inside StandardScaler after we computed the mean and the standard deviation. It is not an issue here but I got your point. We can add pipeline components that persist/checkpoint input datasets. When to unpersist is always the problem. I'll think about it.

Also if you don't mind I'll try to port some of the image processing pipelines we have to this class structure by forking your branch. I feel that'll help me figure out if what features are easy to use etc.

That's great! I felt that making the code compile is really helpful to see the trade-offs. But please understand that this is a WIP and I may update the code.

@jegonzal
Copy link
Contributor

jegonzal commented Nov 6, 2014

The model serving work would really benefit from being able to evaluate models without requiring a Spark context especially since we are shooting for 10s millisecond latencies. Though more generally, we should think about how one might want to use the artifact of the pipeline. I suspect their are uses that may exist outside of Spark and the extent to which the models themselves are "portable functions" could enable greater adoption.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 6, 2014

@manishamde Thanks for your feedback!

Like @jkbradley, I prefer lr.setMaxIter(50) over lr.set(lr.maxIter, 50).

Me too and this is implemented in the current version.

Also, prefer to avoid passing parameters to fit like lr.fit(dataset, lr.maxIter -> 50).

  1. It is useful when you want to try some parameters interactively. For example, I'm using the default setting and I want to see how each parameter affects the result. I can try:
val lr = new LogisticRegression()
  .setMaxIter(20)
  .setRegParam(0.1)
lr.fit(dataset, lr.maxIter -> 50)
lr.fit(dataset, lr.regParam -> 1.0) // note that maxIter goes back to the default setting

If we only allow setters, the code will be

val previousMaxIter = lr.getMaxIter
lr.setMaxIter(50)
lr.fit(dataset)
lr.setMaxIter(previousMaxIter)
val previousRegParam = lr.getRegParam
lr.fit(dataset)
lr.setRegParam(previousRegParam)

Another reason I want to have parameters specified in fit is for multi-model training, as described in the design doc.

Constructors with getters and setters as @shivaram pointed will be great. The LOC reduction is important and should not be discounted.

Besides the binary compatibility issue and Java issue I mentioned, it doesn't save you many characters:

val lr = new LogisticRegression(maxIter = 50, regParam = 0.1)

val lr = new LogisticRegression()
  .setMaxIter(50)
  .setRegParam(0.1)

Do we plan to provided syntactic sugar such as a predict method when we use model to transform a dataset? For me transform fits well with the feature engineering stage and predict after the model training has been performed.

I think we should keep methods that operate on normal RDDs and individual instances.

It will be great to see the corresponding examples in Python.The getter/setters would map well to Python properties. Also, it will be nice to do an apples-to-apples comparison with the scikit-learn pipeline.

We need to deal with the serialization of objects and parameters. @davies is the expert. I expect the Python API be very similar to Scala/Java API.

Finally, how do we plan to programatically answer (developer/user) queries about algorithm properties such as multiclass classification support, using internal storage format, etc.

This is beyond this PR. SPARK-3702 is relevant to your question, which @jkbradley is working on.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22986 has started for PR 3099 at commit 9f408ed.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 6, 2014

Test build #22986 has finished for PR 3099 at commit 9f408ed.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class Estimator[M <: Model] extends PipelineStage with Params
    • abstract class Evaluator extends Identifiable
    • trait Identifiable extends Serializable
    • abstract class Model extends Transformer
    • class Pipeline extends Estimator[PipelineModel]
    • class PipelineModel(val transformers: Array[Transformer]) extends Model
    • abstract class Transformer extends PipelineStage with Params
    • trait HasEstimator extends Params
    • trait HasEvaluator extends Params
    • trait HasEstimatorParamMaps extends Params
    • class CrossValidatorModel(bestModel: Model, metric: Double) extends Model
    • class LogisticRegressionModel(
    • class StandardScaler extends Transformer with Params with HasInputCol with HasOutputCol
    • class DoubleParam(parent: Params, name: String, doc: String, default: Option[Double] = None)
    • class IntParam(parent: Params, name: String, doc: String, default: Option[Int] = None)
    • case class ParamPair[T](param: Param[T], value: T)
    • trait Params extends Identifiable
    • class ParamGridBuilder
    • trait HasRegParam extends Params
    • trait HasMaxIter extends Params
    • trait HasFeaturesCol extends Params
    • trait HasLabelCol extends Params
    • trait HasScoreCol extends Params
    • trait HasThreshold extends Params
    • trait HasMetricName extends Params
    • trait HasInputCol extends Params
    • trait HasOutputCol extends Params

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22986/
Test FAILed.

@shivaram
Copy link
Contributor

Why can't we just remove the getModel(estimator) API for now ? It doesn't seem to be used in the example pipelines from what I can see. I am -1 on adding public APIs that we know very well are insufficient to be used and will most definitely be changed very shortly.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 12, 2014

After a user fits a pipeline, without getModel(estimator), he needs to go through each transformer in the pipeline model to find it. The example pipeline doesn't mean to cover everything in this PR. But this method is very useful:

val lrModel = pipelineModel.getModel(lr)

Is this operation useful? I would say yes. If we both agree, the question becomes whether the current form is okay and what other options we have. I understand that your concern but we need to expose something that can make a "working" pipeline in order to ask users to try. We can test this internally for months, but without users' feedback the use cases we know would be quite limited. The users should be aware of the fact that this is alpha and we should not be afraid of changing APIs that make things better.

@shivaram
Copy link
Contributor

How about just taking a PipelineStage as the argument ? We can check internally if it is an Estimator and if so check if it matches. Does that work ?

While I see you point about getting feedback, I'm just trying to make sure we stick to the goal of exposing a user API and not exposing a developer API [which as far as I can see there is consensus on ?]
So given this goal, all I am asking is that we write the API to match this goal.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 12, 2014

@shivaram I found that this is not the only place we need Estimator/Transformer. In CrossValidator, we need to set the underlying estimator. If we replace it by PipelineStage there, I'm certain that it is going to change soon. If you suggest removing CrossValidator, as I mentioned, I've been thinking about this for the entire day and I felt that providing a pipeline without tools for tuning doesn't make sense. Focusing on user API is indeed our goal here. On the developer API side, it is easy to tell HasMaxIter/UnaryTransformer are developer APIs, while it is actually hard to tell whether Estimator, Transformer, and Evaluator are developer APIs.

Let me try to hide APIs as much as I can and see how it goes.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23260 has started for PR 3099 at commit 01c125a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23260 has finished for PR 3099 at commit 01c125a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaSimpleTextClassificationPipeline
    • case class LabeledDocument(id: Long, text: String, label: Double)
    • case class Document(id: Long, text: String)
    • abstract class Estimator[M <: Model[M]] extends PipelineStage with Params
    • abstract class Evaluator extends Identifiable
    • abstract class Model[M <: Model[M]] extends Transformer
    • abstract class PipelineStage extends Serializable with Logging
    • class Pipeline extends Estimator[PipelineModel]
    • abstract class Transformer extends PipelineStage with Params
    • class LogisticRegression extends Estimator[LogisticRegressionModel] with LogisticRegressionParams
    • class HashingTF extends UnaryTransformer[Iterable[_], Vector, HashingTF]
    • class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerParams
    • class Param[T] (
    • class DoubleParam(parent: Params, name: String, doc: String, defaultValue: Option[Double] = None)
    • class IntParam(parent: Params, name: String, doc: String, defaultValue: Option[Int] = None)
    • class FloatParam(parent: Params, name: String, doc: String, defaultValue: Option[Float] = None)
    • class LongParam(parent: Params, name: String, doc: String, defaultValue: Option[Long] = None)
    • class BooleanParam(parent: Params, name: String, doc: String, defaultValue: Option[Boolean] = None)
    • case class ParamPair[T](param: Param[T], value: T)
    • trait Params extends Identifiable with Serializable
    • class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorParams with Logging
    • class ParamGridBuilder

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23260/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23259/
Test FAILed.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 12, 2014

I tried to hide APIs as much as I can while maintaining the code at a level where user can actually try creating, configuring, and tuning a pipeline. All major classes are marked as "AlphaComponent". The schema transformation layer is hidden. Identifiable and UnaryTransformer are marked private[ml]. As a result, I copied Tokenizer to ml.feature. Attached is a list of public classes. @mateiz

org.apache.spark.ml
Estimator
Evaluator
Model
Pipeline
PipelineModel
PipelineStage
Transformer

org.apache.spark.ml.classification
LogisticRegression
LogisticRegressionModel

org.apache.spark.ml.evaluation
BinaryClassificationEvaluator

org.apache.spark.ml.feature
HashingTF
StandardScaler
StandardScalerModel
Tokenizer

org.apache.spark.ml.param
BooleanParam
DoubleParam
FloatParam
IntParam
LongParam
Param
ParamMap
ParamPair
Params

org.apache.spark.ml.tuning
CrossValidator
CrossValidatorModel
ParamGridBuilder

@mengxr
Copy link
Contributor Author

mengxr commented Nov 12, 2014

test this please

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23261 has started for PR 3099 at commit 2cc93fd.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23262 has started for PR 3099 at commit 2cc93fd.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23261 has finished for PR 3099 at commit 2cc93fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaSimpleTextClassificationPipeline
    • case class LabeledDocument(id: Long, text: String, label: Double)
    • case class Document(id: Long, text: String)
    • abstract class Estimator[M <: Model[M]] extends PipelineStage with Params
    • abstract class Evaluator extends Identifiable
    • abstract class Model[M <: Model[M]] extends Transformer
    • abstract class PipelineStage extends Serializable with Logging
    • class Pipeline extends Estimator[PipelineModel]
    • abstract class Transformer extends PipelineStage with Params
    • class LogisticRegression extends Estimator[LogisticRegressionModel] with LogisticRegressionParams
    • class HashingTF extends UnaryTransformer[Iterable[_], Vector, HashingTF]
    • class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerParams
    • class Tokenizer extends UnaryTransformer[String, Seq[String], Tokenizer]
    • class Param[T] (
    • class DoubleParam(parent: Params, name: String, doc: String, defaultValue: Option[Double] = None)
    • class IntParam(parent: Params, name: String, doc: String, defaultValue: Option[Int] = None)
    • class FloatParam(parent: Params, name: String, doc: String, defaultValue: Option[Float] = None)
    • class LongParam(parent: Params, name: String, doc: String, defaultValue: Option[Long] = None)
    • class BooleanParam(parent: Params, name: String, doc: String, defaultValue: Option[Boolean] = None)
    • case class ParamPair[T](param: Param[T], value: T)
    • trait Params extends Identifiable with Serializable
    • class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorParams with Logging
    • class ParamGridBuilder

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23261/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23262 has finished for PR 3099 at commit 2cc93fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaSimpleTextClassificationPipeline
    • case class LabeledDocument(id: Long, text: String, label: Double)
    • case class Document(id: Long, text: String)
    • abstract class Estimator[M <: Model[M]] extends PipelineStage with Params
    • abstract class Evaluator extends Identifiable
    • abstract class Model[M <: Model[M]] extends Transformer
    • abstract class PipelineStage extends Serializable with Logging
    • class Pipeline extends Estimator[PipelineModel]
    • abstract class Transformer extends PipelineStage with Params
    • class LogisticRegression extends Estimator[LogisticRegressionModel] with LogisticRegressionParams
    • class HashingTF extends UnaryTransformer[Iterable[_], Vector, HashingTF]
    • class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerParams
    • class Tokenizer extends UnaryTransformer[String, Seq[String], Tokenizer]
    • class Param[T] (
    • class DoubleParam(parent: Params, name: String, doc: String, defaultValue: Option[Double] = None)
    • class IntParam(parent: Params, name: String, doc: String, defaultValue: Option[Int] = None)
    • class FloatParam(parent: Params, name: String, doc: String, defaultValue: Option[Float] = None)
    • class LongParam(parent: Params, name: String, doc: String, defaultValue: Option[Long] = None)
    • class BooleanParam(parent: Params, name: String, doc: String, defaultValue: Option[Boolean] = None)
    • case class ParamPair[T](param: Param[T], value: T)
    • trait Params extends Identifiable with Serializable
    • class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorParams with Logging
    • class ParamGridBuilder

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23262/
Test PASSed.

@mateiz
Copy link
Contributor

mateiz commented Nov 12, 2014

New changes look good to me.

@shivaram
Copy link
Contributor

Thanks @mengxr -- I agree that having CrossValidator is useful and parameter tuning for algorithms like LogisticRegression is a nice use case enabled by this change. It is unfortunate that we can't find a clean way to isolate the user vs. developer APIs right now.

The latest set of changes do help though and look good to me.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 12, 2014

Thanks everyone for reviewing the code! I've merged this into master and branch-1.2 as an alpha component. User feedback would be greatly appreciated, as it would definitely helps us improve both user and developer experience.

@asfgit asfgit closed this in 4b736db Nov 12, 2014
asfgit pushed a commit that referenced this pull request Nov 12, 2014
This PR adds package "org.apache.spark.ml" with pipeline and parameters, as discussed on the JIRA. This is a joint work of jkbradley etrain shivaram and many others who helped on the design, also with help from  marmbrus and liancheng on the Spark SQL side. The design doc can be found at:

https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o/edit?usp=sharing

**org.apache.spark.ml**

This is a new package with new set of ML APIs that address practical machine learning pipelines. (Sorry for taking so long!) It will be an alpha component, so this is definitely not something set in stone. The new set of APIs, inspired by the MLI project from AMPLab and scikit-learn, takes leverage on Spark SQL's schema support and execution plan optimization. It introduces the following components that help build a practical pipeline:

1. Transformer, which transforms a dataset into another
2. Estimator, which fits models to data, where models are transformers
3. Evaluator, which evaluates model output and returns a scalar metric
4. Pipeline, a simple pipeline that consists of transformers and estimators

Parameters could be supplied at fit/transform or embedded with components.

1. Param: a strong-typed parameter key with self-contained doc
2. ParamMap: a param -> value map
3. Params: trait for components with parameters

For any component that implements `Params`, user can easily check the doc by calling `explainParams`:

~~~
> val lr = new LogisticRegression
> lr.explainParams
maxIter: max number of iterations (default: 100)
regParam: regularization constant (default: 0.1)
labelCol: label column name (default: label)
featuresCol: features column name (default: features)
~~~

or user can check individual param:

~~~
> lr.maxIter
maxIter: max number of iterations (default: 100)
~~~

**Please start with the example code in test suites and under `org.apache.spark.examples.ml`, where I put several examples:**

1. run a simple logistic regression job

~~~
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(1.0)
    val model = lr.fit(dataset)
    model.transform(dataset, model.threshold -> 0.8) // overwrite threshold
      .select('label, 'score, 'prediction).collect()
      .foreach(println)
~~~

2. run logistic regression with cross-validation and grid search using areaUnderROC (default) as the metric

~~~
    val lr = new LogisticRegression
    val lrParamMaps = new ParamGridBuilder()
      .addGrid(lr.regParam, Array(0.1, 100.0))
      .addGrid(lr.maxIter, Array(0, 5))
      .build()
    val eval = new BinaryClassificationEvaluator
    val cv = new CrossValidator()
      .setEstimator(lr)
      .setEstimatorParamMaps(lrParamMaps)
      .setEvaluator(eval)
      .setNumFolds(3)
    val bestModel = cv.fit(dataset)
~~~

3. run a pipeline that consists of a standard scaler and a logistic regression component

~~~
    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
    val lr = new LogisticRegression()
      .setFeaturesCol(scaler.getOutputCol)
    val pipeline = new Pipeline()
      .setStages(Array(scaler, lr))
    val model = pipeline.fit(dataset)
    val predictions = model.transform(dataset)
      .select('label, 'score, 'prediction)
      .collect()
      .foreach(println)
~~~

4. a simple text classification pipeline, which recognizes "spark":

~~~
    val training = sparkContext.parallelize(Seq(
      LabeledDocument(0L, "a b c d e spark", 1.0),
      LabeledDocument(1L, "b d", 0.0),
      LabeledDocument(2L, "spark f g h", 1.0),
      LabeledDocument(3L, "hadoop mapreduce", 0.0)))
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))
    val model = pipeline.fit(training)
    val test = sparkContext.parallelize(Seq(
      Document(4L, "spark i j k"),
      Document(5L, "l m"),
      Document(6L, "mapreduce spark"),
      Document(7L, "apache hadoop")))
    model.transform(test)
      .select('id, 'text, 'prediction, 'score)
      .collect()
      .foreach(println)
~~~

Java examples are very similar. I put example code that creates a simple text classification pipeline in Scala and Java, where a simple tokenizer is defined as a transformer outside `org.apache.spark.ml`.

**What are missing now and will be added soon:**

1. ~~Runtime check of schemas. So before we touch the data, we will go through the schema and make sure column names and types match the input parameters.~~
2. ~~Java examples.~~
3. ~~Store training parameters in trained models.~~
4. (later) Serialization and Python API.

Author: Xiangrui Meng <meng@databricks.com>

Closes #3099 from mengxr/SPARK-3530 and squashes the following commits:

2cc93fd [Xiangrui Meng] hide APIs as much as I can
34319ba [Xiangrui Meng] use local instead local[2] for unit tests
2524251 [Xiangrui Meng] rename PipelineStage.transform to transformSchema
c9daab4 [Xiangrui Meng] remove mockito version
1397ab5 [Xiangrui Meng] use sqlContext from LocalSparkContext instead of TestSQLContext
6ffc389 [Xiangrui Meng] try to fix unit test
a59d8b7 [Xiangrui Meng] doc updates
977fd9d [Xiangrui Meng] add scala ml package object
6d97fe6 [Xiangrui Meng] add AlphaComponent annotation
731f0e4 [Xiangrui Meng] update package doc
0435076 [Xiangrui Meng] remove ;this from setters
fa21d9b [Xiangrui Meng] update extends indentation
f1091b3 [Xiangrui Meng] typo
228a9f4 [Xiangrui Meng] do not persist before calling binary classification metrics
f51cd27 [Xiangrui Meng] rename default to defaultValue
b3be094 [Xiangrui Meng] refactor schema transform in lr
8791e8e [Xiangrui Meng] rename copyValues to inheritValues and make it do the right thing
51f1c06 [Xiangrui Meng] remove leftover code in Transformer
494b632 [Xiangrui Meng] compure score once
ad678e9 [Xiangrui Meng] more doc for Transformer
4306ed4 [Xiangrui Meng] org imports in text pipeline
6e7c1c7 [Xiangrui Meng] update pipeline
4f9e34f [Xiangrui Meng] more doc for pipeline
aa5dbd4 [Xiangrui Meng] fix typo
11be383 [Xiangrui Meng] fix unit tests
3df7952 [Xiangrui Meng] clean up
986593e [Xiangrui Meng] re-org java test suites
2b11211 [Xiangrui Meng] remove external data deps
9fd4933 [Xiangrui Meng] add unit test for pipeline
2a0df46 [Xiangrui Meng] update tests
2d52e4d [Xiangrui Meng] add @AlphaComponent to package-info
27582a4 [Xiangrui Meng] doc changes
73a000b [Xiangrui Meng] add schema transformation layer
6736e87 [Xiangrui Meng] more doc / remove HasMetricName trait
80a8b5e [Xiangrui Meng] rename SimpleTransformer to UnaryTransformer
62ca2bb [Xiangrui Meng] check param parent in set/get
1622349 [Xiangrui Meng] add getModel to PipelineModel
a0e0054 [Xiangrui Meng] update StandardScaler to use SimpleTransformer
d0faa04 [Xiangrui Meng] remove implicit mapping from ParamMap
c7f6921 [Xiangrui Meng] move ParamGridBuilder test to ParamGridBuilderSuite
e246f29 [Xiangrui Meng] re-org:
7772430 [Xiangrui Meng] remove modelParams add a simple text classification pipeline
b95c408 [Xiangrui Meng] remove implicits add unit tests to params
bab3e5b [Xiangrui Meng] update params
fe0ee92 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-3530
6e86d98 [Xiangrui Meng] some code clean-up
2d040b3 [Xiangrui Meng] implement setters inside each class, add Params.copyValues [ci skip]
fd751fc [Xiangrui Meng] add java-friendly versions of fit and tranform
3f810cd [Xiangrui Meng] use multi-model training api in cv
5b8f413 [Xiangrui Meng] rename model to modelParams
9d2d35d [Xiangrui Meng] test varargs and chain model params
f46e927 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-3530
1ef26e0 [Xiangrui Meng] specialize methods/types for Java
df293ed [Xiangrui Meng] switch to setter/getter
376db0a [Xiangrui Meng] pipeline and parameters

(cherry picked from commit 4b736db)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
@JoshRosen
Copy link
Contributor

It looks like this may have broken the Master Maven build: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-pre-YARN/981/

It looks like most of the failures are due to "Task not serializable" exceptions in the MLlib test suites.

@huangjs
Copy link

huangjs commented Nov 14, 2014

Just a question. I'm comparing your example code to the scikit-learn examples in the design document. Looks like it's difficult for me to turn a configuration (data) to a pipeline without writing too much code.

Here I need to call a bunch of setXXX methods to specify the parameters while in scikit-learn I just need a Map.

One important scenario is loading the pipeline and parameters from a configuration file without the need to recompile my code.

Can I do that?

Jianshi

@mengxr
Copy link
Contributor Author

mengxr commented Nov 14, 2014

@huangjs Serialization is on the TODO list. Each parameter is associated with a unique name and each object contains a unique id. So loading from or saving to a configuration file is definitely feasible. We didn't expose getParam(String) and get/set(Param) in this PR. But you should be able to use them to load a pipeline configuration dynamically with a few lines of code, if you are working under org.apache.spark.ml.

@huangjs
Copy link

huangjs commented Nov 14, 2014

@mengxr, Thanks for the explanation. Statically type checked pipelines is a nice, but I don't think we need it for parameters, as usually the parameters are read either from user inputs or some database. And we'll need to check their range anyway (e.g. positive non zero values, or [0, 1), etc. ).

Input validation would be good enough. And we can just use a Map[String, Any] for inputs. And we can have a Map for pipeline -> parameters too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.