Example code for using Spark with Koverse
This Transform shows how two input Collections can be joined together.
The field names to join on are Parameters so we again we avoid hardcoding.
Try running this on collections created from the departments.csv
and employees.csv
using the field name departmentId
to join on.
See src/main/scala/com/koverse/example/spark/JoinTransform.java
.
After running mvn package
, this transform will be in the target/koverse-spark-examples-0.1.0.jar
addon archive,
ready for upload to Koverse.
This is the Python take on the classic parallel computation of counting up words in a text corpus.
-
Create the
koverse_spark_examples
conda environment from theyml
file in thepython
directory of this repo:conda env create --file python/koverse_spark_examples.yml --name koverse_spark_examples
-
Activate the
koverse_spark_examples
conda environment (combination of commonly used Koverse and ML deps) -conda activate koverse_spark_examples
- and while active:- Install pyspark 1.6.3:
pip install git+https://github.com/jzerbe/spark.git@v1.6.3-pyspark#egg=pyspark
- Install pyspark 1.6.3:
-
Set the hadoop, spark, and the correct python executable paths (updating
~/.bashrc
):export HADOOP_HOME=/opt/hadoop-2.6.0 export PYSPARK_PYTHON=/anaconda3/envs/koverse_spark_examples/bin/python export SPARK_HOME=/opt/spark-1.6.3-bin-hadoop2.6
-
Configure your IDE to use the newly created conda environment. If using IntelliJ, be sure to configure the default test runner to use Nosetests:
-
Now, you should be able to right click on a Python test and debug prior to deploying to a Koverse cluster.
As per the documentation,
the bare minimum of files you need in a zip
python addon archive are the description.yaml
and transform.py
files. The transform.py
file
must contain a PySparkTransform
class with an __init__
and execute
method.
This transform will be in target/PySparkExampleAddon-bundle.zip
addon archive after running mvn package
.
Word Count, the "Hello World" of parallel computation, is shown being applied to the text in a specific field of Records.
The field name is a Parameter to the Transform so we can run this Tranform on Records in different Collections without
the field name being hardcoded. Try running this on a collection created from the tweets.jsonstream
example dataset using the field name text
.
See src/main/java/com/koverse/example/spark/JavaWordCountTransform.java
,
src/main/scala/com/koverse/example/spark/WordCountTransform.scala
,
and src/main/scala/com/koverse/example/spark/WordCountDataFrameTransform.scala
.
These transforms will be in the target/koverse-spark-examples-0.1.0.jar
addon archive after running mvn package
.
This Transform shows how two input Collections can be joined together. The field names to join on are Parameters so we again we avoid hardcoding. Try running this on collections created from the "departments.csv" and "employees.csv" using the field name "departmentId" to join on.
This transform uses Sparks' NaiveBayesModel from it's ML library to train a data set as seen here .
Weather Data Set (see resource datasets )
Weather, PlayTennis
Overcast Cold Low Weak, 0
Overcast Mild Low Strong, 0
Sunny Mild Normal Weak, 0
Rain Hot High Strong, 1
Rain Mild Low Strong, 1
Using the features of the dataset (i.e outlook, temperature, humidity, wind) predictions are made if you and a friend will play tennis.
The transform saves the model using ObjectKoverseIO
's objectToBytes (Java's ByteArrayOutputStream/ObjectOutputStream) function.
This converts any Object to Byte Array so that it can be stored to Koverse's SimpleRecord.
This transform reads in Sparks' Naive Bayes Model saved to a Koverse's SimpleRecord in NaiveBayesTrainedTransform
.
The transform leverages ObjectKoverseIO
's objectFromBytes (Java's ByteArrayInputStream/ObjectInputStream) function.
This converts a Byte Array to a type specified, this instance being a NaiveBayesModel.
Once the NaiveBayesModel is successfully read then it can use the data saved for predictions to predict whether you and your friend will play tennis based on the weather.
The predictions are then stored to a SimpleRecord to Koverse.