Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joergboe committed Mar 21, 2019
2 parents 671a66b + e4e3dd7 commit 3653b38
Show file tree
Hide file tree
Showing 16 changed files with 318 additions and 133 deletions.
78 changes: 78 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,81 @@ To learn more about Streams:
* [Introduction to Streams Quick Start Edition](http://ibmstreams.github.io/streamsx.documentation/docs/4.1/qse-intro/)
* [Streams Getting Started Guide](http://ibmstreams.github.io/streamsx.documentation/docs/4.1/qse-getting-started/)
* [StreamsDev](https://developer.ibm.com/streamsdev/)

# Developing and running applications that use the SparkMLLib Toolkit

To create applications that use the SparkMLLib Toolkit, you must configure either Streams Studio
or the SPL compiler to be aware of the location of the toolkit.

## Before you begin

* Install IBM InfoSphere Streams. Configure the product environment variables by entering the following command:
source product-installation-root-directory/4.0.1.0/bin/streamsprofile.sh
* Install a version of Apache Spark 2.4.0 and set the SPARK_HOME environment variable to the location where Spark is installed. Note that SPARK_HOME must be set on all nodes of the Streams cluster where a SparkMLLib operator can run.
* Generate a Spark model as described in the next section and save it to the local filesystem or HDFS.

## Spark Models
This toolkit provides a number of operators that can load a stored Spark MLlib model and use it to perform real time scoring on incoming tuple data.

For example, the SparkCollaborativeFilteringALS operator
can load a Spark collaborative filtering model (of type MatrixFactorizationModel in the Spark API). In order for the operator to be able to use this model within Streams, the Spark program that created the original
model must store the model. The following scala code demonstrates how the model can be saved to HDFS:
```
//Generate a MatrixFactorizationModel by training against test data
val model = ALS.train(training, rank, numIter, lambda)
//Save the generated model to the filesystem
model.save(sparkContext, "hdfs://some/path/my_model")
```

Once the model has been persisted, the path to the persisted model would be passed in as a parameter to the SparkCollaborativeFilteringALS operator. The following code
demonstrates how this would be done in the SPL program:


```
(stream<int32 user, int32 counter, list<int32> analysisResult> SparkCollaborativeFilteringALSOut) as
SparkCollaborativeFilteringALSOp1 =
SparkCollaborativeFilteringALS(InputPort1)
{
param
analysisType : RecommendProducts ;
attr1 : Beacon_1_out0.user ;
attr2 : Beacon_1_out0.counter ;
modelPath : "hdfs://some/path/my_model" ;
}
```

On initialization, the operator will load the model. Each incoming tuple will be used to generate a score using the model and the score would be passed as an attribute called 'analysisResult' on the output schema.

## To Use this Toolkit in you Application

After the location of the toolkit is communicated to the compiler, the SPL artifacts that are specified in the toolkit
can be used by an application. The application can include a use directive to bring the necessary namespaces into scope.
Alternatively, you can fully qualify the operators that are provided by toolkit with their namespaces as prefixes.

1. Verify that the SPARK_HOME environment variable is set as described above.
2. Make sure that a trained Spark model has been saved to the local file system or on HDFS.
3. Configure the SPL compiler to find the toolkit root directory. Use one of the following methods:
* Set the **STREAMS_SPLPATH** environment variable to the root directory of a toolkit or multiple toolkits (with : as a separator).
For example:
export STREAMS_SPLPATH=$STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib
* Specify the **-t** or **--spl-path** command parameter when you run the **sc** command. For example:
sc -t $STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib -M MyMain
where MyMain is the name of the SPL main composite.
**Note**: These command parameters override the **STREAMS_SPLPATH** environment variable.
* Add the toolkit location in InfoSphere Streams Studio.
4. Develop your application.
5. Build your application. You can use the **sc** command or Streams Studio.
6. Start the InfoSphere Streams instance.
7. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio.

# What's changed
## Version 1.2.0
* Use of actual stark version 2.4.0

## Version 1.1.1
* Some path changes

## Version 1.1.0
* Internationalization for languages de_DE, es_ES, fr_FR, it_IT, ja_JP, ko_KR, pt_BR, ru_RU, zh_CN, zh_TW

59 changes: 19 additions & 40 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
<!-- Exit if there is a bad value in streams.install -->
<fail unless="env.STREAMS_INSTALL" message="No streams installation found. Exiting!"/>
<!-- set global properties for this build -->
<property name="toolkit.dir" location="com.ibm.streamsx.sparkmllib" />
<property name="toolkit.test" location="tests" />
<property name="toolkit.doc" location="doc/spldoc" />
<property name="toolkit.samples.dir" location="samples" />
<property name="toolkit.dir" value="com.ibm.streamsx.sparkmllib" />
<property name="toolkit.test" value="tests" />
<property name="doc.string" value="doc"/>
<property name="doc.dir" value="doc/spldoc" />
<property name="samples.dir" value="samples" />
<property name="spl-mt" value="${env.STREAMS_INSTALL}/bin/spl-make-toolkit"/>
<property name="spl-md" value="${env.STREAMS_INSTALL}/bin/spl-make-doc"/>
<property name="release.dir" location="release"/>
<property name="release.dir" value="release"/>

<!-- Create the time stamp -->
<tstamp/>
Expand All @@ -26,28 +27,9 @@
</exec>

<!-- these targets are here due to compatibility -->
<target name="all" depends="toolkit" description="Build all toolkit artifacts - incremental build"/>
<target name="all" depends="doc" description="Build all toolkit artifacts and spl docs - incremental build"/>
<target name="clean" depends="toolkit-clean,doc-clean" description="Clean all generated and downloaded toolkit files, clean samples and clean the documentation"/>

<!-- Merge the commit hash into toolkit version number -->
<target name="setcommitversion">
<!-- Update the info.xml -->
<replace file="${toolkit.dir}/info.xml" token="__dev__" value="commit_${commithash}" summary="yes" />
<!-- Extract info from the toolkit's info.xml -->
<xmlproperty file="${toolkit.dir}/info.xml" prefix="tkinfo" keepRoot="no" />
<echo message="Toolkit Version: ${tkinfo.info:identity.info:version}" />
<echo message="Git Hash: ${commithash}" />
</target>

<!-- Revert setcommitversion -->
<target name="revertversion">
<exec executable="git">
<arg value="checkout" />
<arg value="--" />
<arg value="${toolkit.dir}/info.xml" />
</exec>
</target>

<target name="toolkit" description="Build toolkit code and index the toolkit - incremental build.">
<ant dir="${toolkit.dir}" target="all" />
</target>
Expand All @@ -57,13 +39,11 @@
<delete dir="${tmp}" />
</target>

<!-- TODO: currently only the spldoc is generated with commitversion in info.xml -->
<target name="doc" depends="toolkit"
description="Generate the toolkit documentation">
<antcall target="setcommitversion"/>
<property name="tktitle" value="IBMStreams ${ant.project.name} Toolkit" />
<property name="tkauthor" value="IBMStreams Open Source Community at GitHub - https://github.com/IBMStreams/${ant.project.name}" />
<echo message="Tookit to SPLDOC: ${toolkit.doc}" />
<echo message="Tookit to SPLDOC: ${doc.dir}" />
<exec executable="${spl-md}">
<arg value="--include-composite-operator-diagram" />
<arg value="--author" />
Expand All @@ -74,37 +54,36 @@
<arg value="${toolkit.dir}" />
<arg value="--copy-image-files" />
<arg value="--output-directory" />
<arg value="${toolkit.doc}" />
<arg value="${doc.dir}" />
</exec>
<antcall target="revertversion"/>
</target>

<target name="doc-clean"
description="Clean the toolkit documentation">
<delete dir="${toolkit.doc}"/>
<delete dir="${doc.dir}"/>
</target>

<!-- Test targets -->
<target name="test"
description="Execute the toolkit test. Requires an up to date toolkit build">
<target name="test" depends="toolkit"
description="Execute the toolkit test">
<!-- <ant dir="${toolkit.test}" target="all" /> -->
<echo message="Check if toolkit is compile clean"/>
<antcall target="samples"/>
</target>


<!-- Targets called on samples -->
<target name="samples"
description="Build all samples. Requires an up to date toolkit build">
<target name="samples" depends="toolkit"
description="Build all samples">
<subant target="samplebuild" genericantfile="${basedir}/build.xml" failonerror="true">
<dirset dir="samples" includes="*"/>
<dirset dir="${samples.dir}" includes="*"/>
</subant>
</target>

<target name="samples-clean"
description="Clean all samples">
<subant target="sampleclean" genericantfile="${basedir}/build.xml" failonerror="true">
<dirset dir="samples" includes="*"/>
<dirset dir="${samples.dir}" includes="*"/>
</subant>
</target>

Expand Down Expand Up @@ -134,12 +113,12 @@
<mkdir dir="${release.dir}"/>
<xmlproperty file="${toolkit.dir}/info.xml" prefix="tkinfo" keepRoot="no"/>
<echo message="Make releasefile Toolkit Version: ${tkinfo.info:identity.info:version}"/>
<property name="releasefilename" value="${release.dir}/${ant.project.name}-${tkinfo.info:identity.info:version}-${DSTAMP}-${TSTAMP}-${commithash}.tgz"/>
<!-- TODO: include samples .project .settitngs and .classpath -->
<property name="releasefilename" value="${release.dir}/${ant.project.name}-${tkinfo.info:identity.info:version}-${commithash}-${DSTAMP}-${TSTAMP}.tgz"/>
<tar compression="gzip" longfile="gnu"
destfile="${releasefilename}"
basedir="${basedir}"
excludes="**/.toolkitList **/.gitignore **/.settings/** **/.settings **/.project **/.classpath **/opt/downloaded/** release/** tmp/**"
includes="${toolkit.dir}/** ${samples.dir}/** ${doc.string}/** README.md LICENSE"
excludes="**/.toolkitList **/.gitignore ${toolkit.dir}/.settings/ ${toolkit.dir}/.project ${toolkit.dir}/.classpath ${toolkit.dir}/build.xml ${toolkit.dir}/pom.xml ${toolkit.dir}/impl/java/ **/opt/downloaded/** ${release.dir}/** ${samples.dir}/*/output/ tmp/**"
/>
<checksum file="${releasefilename}"/>
<checksum algorithm="sha1" file="${releasefilename}"/>
Expand Down
Loading

0 comments on commit 3653b38

Please sign in to comment.