Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove local runner + update docs #335

Merged
merged 5 commits into from
Jun 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/abstractions/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Once all the Features and Feature transformations have been defined, actual data
In the example below, we would like to materialize ```bucketizedAge``` and ```nameTokens```. So we set these two Features as the result Features for a new Workflow:

```scala
val workflow = new OPWorkflow().setResultFeatures(bucketizedAge, nameTokens).setReader(PassengerReader)
val workflow = new OpWorkflow().setResultFeatures(bucketizedAge, nameTokens).setReader(PassengerReader)
```

The PassengerReader is a DataReader that essentially specifies a ```read``` method that can be used for loading the Passenger data. When we train this workflow, it reads the Passenger data and fits the bucketization estimator by determining the optimal buckets for ```age```:
Expand All @@ -73,7 +73,7 @@ The workflowModel now has a prepped DAG of Transformers. By calling the ```score
val dataFrame = workflowModel.setReader(OtherPassengerReader).score()
```

WorkflowModels can be saved and loaded. For more advanced reading on topics like stacking workflows, aggregate DataReaders for time-series data, or joins for DataReaders, follow our links to [Workflows](../developer-guide#workflows) and [Readers](../developer-guide#datareaders).
Workflow models can be saved and loaded. For more advanced reading on topics like stacking workflows, aggregate DataReaders for time-series data, or joins for DataReaders, follow our links to [Workflows](../developer-guide#workflows) and [Readers](../developer-guide#datareaders).



Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,6 @@
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'TransmogrifAI', 'TransmogrifAI',
author, 'TransmogrifAI', 'utomated machine learning for structured data.',
author, 'TransmogrifAI', 'Automated machine learning for structured data.',
'Miscellaneous'),
]
51 changes: 32 additions & 19 deletions docs/developer-guide/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ case class Feature[O <: FeatureType] private[op]
name: String,
isResponse: Boolean,
originStage: OpPipelineStage[O],
parents: Seq[OPFeature],
parents: Seq[FeatureLike[_ <: FeatureType]],
uid: String,
distributions: Seq[FeatureDistributionLike] = Seq.empty
)(implicit val wtt: WeakTypeTag[O]) extends FeatureLike[O] {
Expand Down Expand Up @@ -82,7 +82,7 @@ TransmogrifAI Transformers extend Spark Transformers but are designed to interac

### Writing your own transformer

TransmogrifAI Transformers can easily be created by finding the appropriate base class and extending it. The TransmogrifAI Transformer base classes have default implementations for all the book keeping methods associated with Spark and OP. Thus when using OpTranfomer base classes the only things that need to be defined are the name of the operation associated with your Transformer, a default uid function for your transformer, and the function that maps the input to the output.
TransmogrifAI Transformers can easily be created by finding the appropriate base class and extending it. The TransmogrifAI Transformer base classes have default implementations for all the book keeping methods associated with Spark and TransmogrifAI. Thus when using OpTranfomer base classes the only things that need to be defined are the name of the operation associated with your Transformer, a default uid function for your transformer, and the function that maps the input to the output.

**Note that when creating a new stage (Transformer or Estimator) the uid should always be a constructor argument.** If the uid is not a constructor the stages will not be serialized correctly when models are saved.

Expand Down Expand Up @@ -538,7 +538,7 @@ val workflow = new OpWorkflow().setResultFeatures(survived, rawPrediction, prob,
maxJSDivergence = 0.90,
maxCorrelation = 0.95,
correlationType = CorrelationType.Pearson,
protectedFeatures = Array.empty[OPFeature]
protectedFeatures = Array.empty()
)
```

Expand Down Expand Up @@ -611,7 +611,7 @@ All TransmogrifAI Stages can be used as spark ML stages by passing a Dataset or

Workflows are used to control the execution of the ML pipeline once the final features have been defined. Each Feature contains the history of how it is defined by tracking both the parent Features and the parent Stages. However, this is simply a *description* of how the raw data will be transformed in order to create the final data desired, until the Features are put into a Workflow there is no actual data associated with the pipeline. OpWorkflows create and transform the raw data needed to compute Features fed into them. In addition they optimize the application of Stages needed to create the final Features ensuring optimal computations within the full pipeline DAG. OpWorkflows can be fit to a given dataset using the `.train()` method. This produces an OpWorkflowModel which can then be saved to disk and applied to another dataset.

### Creating A Workflow
### Creating a Workflow

In order to create a Workflow that can be used to generate the desired features the result Features and the Reader (or data source) must be defined.

Expand Down Expand Up @@ -641,11 +641,11 @@ When a workflow gets fitted
val model: OpWorkflowModel = workflow.train()
```

a number of things happen: the data is read using the DataReader, raw Features are built, each Stage is executed in sequence and all Features are materialized and added to the underlying Dataframe. During Stage execution, each Estimator gets fitted and becomes a Transformer. A fitted Workflow (eg. a WorkflowModel) therefore contains sequence of Transformers (map operations) which can be applied to any input data of the appropriate type.
a number of things happen: the data is read using the DataReader, raw Features are built, each Stage is executed in sequence and all Features are materialized and added to the underlying Dataframe. During Stage execution, each Estimator gets fitted and becomes a Transformer. A fitted Workflow (eg. a OpWorkflowModel) therefore contains sequence of Transformers (map operations) which can be applied to any input data of the appropriate type.

### Fitted Workflows

A WorkflowModel, much like a Spark Pipeline, can be used to score data ingested from any DataReader that returns the appropriate data input type. If the score method is called immediately after train, it will score the data used to train the model. Updating the DataReader_ _allows scoring on a test set or on production data.
A OpWorkflowModel, much like a Spark Pipeline, can be used to score data ingested from any DataReader that returns the appropriate data input type. If the score method is called immediately after train, it will score the data used to train the model. Updating the DataReader_ _allows scoring on a test set or on production data.

```scala
val trainDataReader = DataReaders.Aggregate.avro[Passenger](key = _.passengerId, path = Some("my/train/data/path")) // aggregates data across many records
Expand All @@ -671,7 +671,7 @@ val df = workflowModel.computeDataUpTo(normedAge)

Here it is important to realize that when models are shared across applications, or namespaces, all Features must be within the scope of the calling application. A user developing a model with the intention of sharing it must take care to properly define and expose all Features.

### Saving Workflows
### Saving Fitted Workflows

Fitted workflows can be saved to a file which can be reloaded and used to score data at a later time. The typical use case is for one run to train a workflow, while another run will use this fitted workflow for scoring. Saving a workflow requires the use of dedicated save method.

Expand All @@ -684,23 +684,34 @@ The saved workflow model consists of several json files that contain all the inf

For features we store their type, name, response or predictor, origin stage UID and parent feature names.

Persisting transfomer stages is trivial. We save transformer's class name, UID and Spark param values provided during training.
For workflow stages we save their class name, UID, constructor arguments and Spark param values provided during training. Using the class name and the constructor arguments we then able to reconstruct the instance of the stage and set the Spark param values.

Estimators are a bit trickier. We save the class name of the estimator model, UID, constructor arguments and Spark param values provided during training. Using the class name and the constructor arguments we then able to reconctruct the instance of the estimator model and set the Spark param values.
If you would like to customize how a stage is saved/loaded, you can provide a special reader/writer implementation as follows:

```scala
// implement a custom reader/writer for your stage
class MyStageReaderWriter extends OpPipelineStageReaderWriter[MyStage] {
def read(stageClass: Class[MyStage], json: JValue): Try[MyStage] = ???
def write(stage: MyStage): Try[JValue] = ???
}

// add annotation with ReaderWriter to your transformer with the custom reader/writer implementation class MyStageReaderWriter
@ReaderWriter(classOf[MyStageReaderWriter])
class MyStage(...) extends UnaryTransformer[Text, Text] { ... }
```

[Here](https://github.com/salesforce/TransmogrifAI/blob/master/core/src/main/scala/com/salesforce/op/stages/impl/feature/TextTokenizer.scala#L114) is a full example of a custom reader/writer implementation for our own TextTokenizer stage.

### Loading saved Workflows
### Loading Saved Workflows

Just like saving a workflow, loading them requires the use of dedicated load method on the workflow. Note that you have to use the exact same workflow that was used during training, otherwise expect errors.
Loading fitted saved workflows requires the use of dedicated load method on the OpWorkflowModel object.

```scala
// this is the workflow instance we trained before
val workflow = ...
// load the model for the previosely trained workflow
val workflowModel: OpWorkflowModel = workflow.loadModel(path = "/my/model/location")
// load the model for the previously trained workflow
val workflowModel: OpWorkflowModel = OpWorkflowModel.load(path = "/my/model/location")
```

When loading a model we match the saved features and stages to the ones provided in the workflow instance using their UIDs. For each transformer we simply set its Spark params. The estimator models are contructred using reflection using their class names and costructor arguments, and finally their Spark params are set. The operation of assigning UIDs is based on a process global state, therefore loading models is not a thread safe operation. Meaning if you are going to load models in a multi-threaded program make sure to synchronize access to `loadModel` call accordingly.
When loading a model we match the saved features and stages using their UIDs. The stages are constructed using reflection from their class names and saved constructor arguments, or using the custom reader/writer implementation provided through the ReaderWriter annotation. Finally we set Spark params for all the reconstructed stages.

Once loaded, the model can be modified and operated on normally. It is important to note that readers are not saved with the rest of the OpWorkflowModel, so in order to use a loaded model for scoring a DataReader must be assigned to the OpWorkflowModel.

Expand All @@ -711,6 +722,8 @@ val results = workflowModel
.score() // run scoring or evaluation
```

Alternatively, you can use a local scoring module to score models without need in neither readers nor Spark runtime. Read more about it [here](https://github.com/salesforce/TransmogrifAI/tree/master/local).

### Removing problematic features

One of the fundamental assumptions of machine learning is that the data you are using to train your model reflects the data that you wish to score. TransmogrifAI has an optional stage after data reading that allows to to check that your features do not violate this assumption and remove any features that do. This stage is called the [RawFeatureFilter](https://github.com/salesforce/TransmogrifAI/blob/master/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala), and to use it you simply call the method `withRawFeatureFilter(Some(trainReader), Some(scoreReader),...)` on your [Workflow](https://github.com/salesforce/op/blob/master/core/src/main/scala/com/salesforce/op/OpWorkflow.scala#L360). This method takes the training and scoring data readers as well as some optional settings for when to exclude features (If you specify the data path in the [OpParams](https://github.com/salesforce/op/blob/master/features/src/main/scala/com/salesforce/op/OpParams.scala) passed into the Workflow you will need to set the score data path in the `alternateReaderParams`). It will load the training and scoring data and exclude individual features based on fill rate, retaliative fill rate between training ans scoring, or differences in the distribution of data between training and scoring. Features that are excluded based on these criteria will be blacklisted from the model and removed from training.
Expand Down Expand Up @@ -741,7 +754,7 @@ val fittedLeadWorkflow = new OpWorkflow()
.train()

// if you want to use just the fitted label indexer from the model or extract information
// from that fitted stage, you can use the getOriginStageOf method on the fitted WorkflowModel
// from that fitted stage, you can use the getOriginStageOf method on the fitted OpWorkflowModel
val labelIndexer = fittedLeadWorkflow
.getOriginStageOf(indexedLabel).asInstanceOf[OpStringIndexerNoFilter] //

Expand All @@ -764,7 +777,7 @@ val newWorkflow = workflow.withModelStages(fittedModel)
```


This will add the stages from the model to the Workflow replacing any estimators that have corresponding fitted models in the workflow with the fitted version. The two workflows and all their stages and features must be created in the same workspace as only directly matching stages will be replaced. When train is called on this Workflow only Estimators that did NOT appear in the previous DAG will be fit in order to create the new WorkflowModel. All stages that appeared in the original WorkflowModel will use the fit values obtained in the first fit (corresponding to the first dataset).
This will add the stages from the model to the Workflow replacing any estimators that have corresponding fitted models in the workflow with the fitted version. The two workflows and all their stages and features must be created in the same workspace as only directly matching stages will be replaced. When train is called on this Workflow only Estimators that did NOT appear in the previous DAG will be fit in order to create the new OpWorkflowModel. All stages that appeared in the original OpWorkflowModel will use the fit values obtained in the first fit (corresponding to the first dataset).

### Metadata

Expand All @@ -789,7 +802,7 @@ val metadata = workflowModel.getOriginStageOf(checkedFeatures).getMetadata()
val summaryData = SanityCheckerSummary.fromMetadata(metadata.getSummaryMetadata())
```

If you wish to combine the metadata from stages commonly used in modeling (ModelSelectors, SanityCheckers, Vectorizers) into a single easy(er) to reference case class we have provided a method for this in the [WorkflowModel](#extracting-modelinsights-from-a-fitted-workflow) so that users don't need to stitch this information together for themselves.
If you wish to combine the metadata from stages commonly used in modeling (ModelSelectors, SanityCheckers, Vectorizers) into a single easy(er) to reference case class we have provided a method for this in the [OpWorkflowModel](#extracting-modelinsights-from-a-fitted-workflow) so that users don't need to stitch this information together for themselves.

In addition to accessing Metadata that is created by stages you may wish to add Metadata to stages of your own. For example if you created your own string indexer to map strings to integers (though we have a stage that does [this](https://github.com/salesforce/TransmogrifAI/blob/master/core/src/main/scala/com/salesforce/op/stages/impl/feature/OpStringIndexerNoFilter.scala)), you might wish to save the mapping from idex back to string in the Metadata of the new column of integers you are creating. You would do this within the `fitFn` of the Estimator you are creating by using the `setMetadata(meta: Metadata)` method. You need a MetadataBuilder object to work with Metadata, which is essentially a wrapper around a Map of Map. For example, within an Estimator you would get a reference to a MetadataBuilder and use it as follows:

Expand Down
2 changes: 1 addition & 1 deletion docs/examples/Titanic-Binary-Classification.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ The fitted workflow can now be saved, and loaded again to be applied to any new
```scala
fittedWorkflow.save(saveWorkflowPath)

val savedWorkflow = workflow.loadModel(saveWorkflowPath).setReader(testDataReader)
val savedWorkflow = OpWorkflowModel.load(saveWorkflowPath).setReader(testDataReader)
```


2 changes: 1 addition & 1 deletion docs/faq/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ import com.salesforce.op.utils.spark.RichStructType._
You can! Simply use the `.setInputRDD(myRDD)` or `.setInputDataSet(myDataSet)` methods on Workflow to pass in your data.

## How do I examine intermediate data when trying to debug my ML workflow?
You can generate data up to any particular point in the Workflow using the method `.computeDataUpTo(myFeature)`. Calling this method on your Workflow or WorkflowModel will compute a DataFrame which contains all of the rows for features created up to that point in your flow.
You can generate data up to any particular point in the OpWorkflow using the method `.computeDataUpTo(myFeature)`. Calling this method on your OpWorkflow or OpWorkflowModel will compute a DataFrame which contains all of the rows for features created up to that point in your flow.

5 changes: 2 additions & 3 deletions local/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import com.salesforce.op.local._
// Spark Session needed for model loading & score function creation
implicit val spark = SparkSession.builder().getOrCreate()

// Create your workflow & load the model
val workflow: OpWorkflow = ...
val model = workflow.loadModel("/path/to/model")
// Load the trained model
val model = OpWorkflowModel.load("/path/to/model")

// Create score function once and use it indefinitely
val scoreFn = model.scoreFunction
Expand Down
Loading