Mojo pipeline artifacts can be used in Spark to deploy predictions in parallel using the Sparkling Water API. The user needs to have Spark cluster with Sparkling Water JAR file passed to Spark. In case of PySparkling, the PySparkling zip file.
The H2OContext does not have to be created in case we want to run only predictions on mojos using Spark as they are written to be independent on the H2O run-time.
The following sections show how to load and run predictions on Mojo Pipeline in Spark using Scala and Python API.
In case you are upgrading H2O Driverless AI, we have a good news. Sparkling Water is backwards compatible with mojo versions produced by older Driverless AI versions.
Both, PySparkling and Sparkling Water needs to be started with some extra configuration to enable Mojo Pipeline
We need to pass the path to the H2O Driverless AI license to --jars
Spark's argument. Additionally, we need to add to the same --jars
configuration path to the Mojo Pipeline implementation JAR file. This file is propriatory and is not part of the resulting Sparkling Water assembly
JAR file.
Note: In Local Spark mode, please use --driver-class-path
to specify path to the license and Mojo Pipeline JAR file.
First, let's start Spark with all the required dependencies
./bin/pyspark --jars license.sig,mojo-pipeline-impl.jar --py-files pysparkling.zip
We can see that we passed the license and mojo pipeline implementation library to the --jars
argument and also specified path
to the PySparkling python library.
At this point, we should have available PySpark interactive terminal where we can try our predictions. In case we would like
to productionalize the scoring process, we can use the same configuration except instead of using ./bin/pyspark
, we would
use ./bin/spark-submit
to submit our job to a cluster.
# First, specify the dependency
from pysparkling.ml import H2OMOJOPipelineModel
# Load the pipeline
mojo = H2OMOJOPipelineModel.create_from_mojo("file:///path/to/the/pipeline.mojo")
# This option ensures that the output columns are named properly. If we want to use old behaviour
# when all output columns were stored inside and array, don't specify this configuration option
# or set it to False. We however strongly encourage users to use
mojo.set_named_mojo_output_columns(True)
# Load the data as Spark's Data Frame
data_frame = self._spark.read.csv("file:///path/to/the/data.csv", header=True)
# Run the predictions. The predictions contain all the original columns plus the predictions added as new columns
predictions = mojo.predict(data_frame)
# We can easily get the predictions for desired column using the helper function as
predictions.select(mojo.select_prediction_udf("AGE")).collect()
First, let's start Spark with all the required dependencies
./bin/spark-shell --jars license.sig,mojo-pipeline-impl.jar,sparkling-water-assembly.jar
We can see that we passed the license and mojo pipeline implementation library to the --jars
argument and also specified path
to the Sparkling Water assembly JAR.
At this point, we should have available PySpark interactive terminal where we can try our predictions. In case we would like
to productionalize the scoring process, we can use the same configuration except instead of using ./bin/spark-shell
, we would use ./bin/spark-submit
to
submit our job to a cluster.
// First, specify the dependency
import org.apache.spark.ml.h2o.models.H2OMOJOPipelineModel
// Load the pipeline
val mojo = H2OMOJOPipelineModel.createFromMojo("file:///path/to/the/pipeline.mojo")
// This option ensures that the output columns are named properly. If you want to use old behaviour
// when all output columns were stored inside and array, don't specify this configuration option
// or set it to False. We however strongly encourage users to use
mojo.setNamedMojoOutputColumns(true)
// Load the data as Spark's Data Frame
val dataFrame = spark.read.option("header", "true").csv("file:///path/to/the/data.csv")
// Run the predictions. The predictions contain all the original columns plus the predictions added as new columns
val predictions = mojo.transform(dataFrame)
// We can easily get the predictions for desired column using the helper function as
predictions.select(mojo.selectPredictionUDF("AGE"))