From abf32ac56cefd19609adb5a9692636c19d78d270 Mon Sep 17 00:00:00 2001 From: Jin Chi He Date: Wed, 30 Jan 2019 01:44:53 +0800 Subject: [PATCH] update pipeline TFX taxi sample to support on-prem cluster --- .../src/apiserver/config/sample_config.json | 2 +- samples/tfx/README.md | 45 +++- ...> taxi-cab-classification-pipeline-gcp.py} | 0 ...axi-cab-classification-pipeline-on-prem.py | 201 ++++++++++++++++++ test/sample-test/run_test.sh | 18 +- 5 files changed, 246 insertions(+), 20 deletions(-) rename samples/tfx/{taxi-cab-classification-pipeline.py => taxi-cab-classification-pipeline-gcp.py} (100%) create mode 100644 samples/tfx/taxi-cab-classification-pipeline-on-prem.py diff --git a/backend/src/apiserver/config/sample_config.json b/backend/src/apiserver/config/sample_config.json index 81e0917bb61..0fc86c1dfe8 100644 --- a/backend/src/apiserver/config/sample_config.json +++ b/backend/src/apiserver/config/sample_config.json @@ -7,7 +7,7 @@ { "name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer", "description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx", - "file":"/samples/tfx/taxi-cab-classification-pipeline.py.tar.gz" + "file":"/samples/tfx/taxi-cab-classification-pipeline-gcp.py.tar.gz" }, { "name":"[Sample] Basic - Sequential", diff --git a/samples/tfx/README.md b/samples/tfx/README.md index 5ec149476cc..a4477510a30 100644 --- a/samples/tfx/README.md +++ b/samples/tfx/README.md @@ -1,4 +1,4 @@ -The `taxi-cab-classification-pipeline.py` sample runs a pipeline with TensorFlow's transform and model-analysis components. +The sample runs a pipeline with TensorFlow's transform and model-analysis components. The `taxi-cab-classification-pipeline-gcp.py` is for GCP and `taxi-cab-classification-pipeline-on-prem.py` is for on-prem cluster. ## The dataset @@ -19,30 +19,54 @@ dataset in [Google BigQuery](https://cloud.google.com/bigquery/). Explore the full dataset in the [BigQuery UI](https://bigquery.cloud.google.com/dataset/bigquery-public-data:chicago_taxi_trips). + ## Requirements -Preprocessing and model analysis use [Apache Beam](https://beam.apache.org/). +- Using GCP + + Preprocessing and model analysis use [Apache Beam](https://beam.apache.org/). + + When run with the `cloud` mode (instead of the `local` mode), those steps use [Google Cloud DataFlow](https://beam.apache.org/) for running the Beam pipelines. -When run with the `cloud` mode (instead of the `local` mode), those steps use [Google Cloud DataFlow](https://beam.apache.org/) for running the Beam pipelines. + Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api). + +- On-prem cluster + + When run the on-prem clusters, follow the [document](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) to create the persistent volume and persistent volume claim to storage the intermediate data and result. Note that the `accessModes` should be `ReadWriteMany` so that the volume can be mounted as read-write by many nodes. For example, the `taxi-cab-classification-pipeline_on-prem.py` sample is associated with a persistent volume claim that's named `pipeline-pvc`. -Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api). ## Compiling the pipeline template Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK, then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file. -```bash -dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz -``` +- GCP + ```bash + dsl-compile --py taxi-cab-classification-pipeline-gcp.py --output taxi-cab-classification-pipeline.tar.gz + ``` +- On-prem cluster + ```bash + dsl-compile --py taxi-cab-classification-pipeline-on-prem.py --output taxi-cab-classification-pipeline.tar.gz + ``` ## Deploying the pipeline Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template. -The pipeline requires two arguments: +### GCP + + The pipeline requires two arguments: + + 1. The name of a GCP project. + 2. An output directory in a Google Cloud Storage bucket, of the form `gs:///`. + +### On-prem cluster + +- Before deploying the pipeline, download the training and evaluation data [taxi-cab-classification](https://github.com/kubeflow/pipelines/tree/master/samples/tfx/taxi-cab-classification) from Github, and copy the directory to the persistent volume storage. + +- Following the [guide](https://www.kubeflow.org/docs/pipelines/pipelines-ui/) to run an experiment and a run inside the experiment. + + **Limitation**: The value of the pvc_name parameter must be consistent with the value as it is specified in the pipeline definition of the `taxi-cab-classification-pipeline-on-perm.py` file. See the [dsl PipelineParam does not work under Image or Command](https://github.com/kubeflow/pipelines/issues/521) issue for more information about this limitation. -1. The name of a GCP project. -2. An output directory in a Google Cloud Storage bucket, of the form `gs:///`. ## Components source @@ -61,3 +85,4 @@ Analysis: Prediction: [source code](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict/src) [container](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict) + diff --git a/samples/tfx/taxi-cab-classification-pipeline.py b/samples/tfx/taxi-cab-classification-pipeline-gcp.py similarity index 100% rename from samples/tfx/taxi-cab-classification-pipeline.py rename to samples/tfx/taxi-cab-classification-pipeline-gcp.py diff --git a/samples/tfx/taxi-cab-classification-pipeline-on-prem.py b/samples/tfx/taxi-cab-classification-pipeline-on-prem.py new file mode 100644 index 00000000000..233f002db98 --- /dev/null +++ b/samples/tfx/taxi-cab-classification-pipeline-on-prem.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl +from kubernetes import client as k8s_client + + +def dataflow_tf_data_validation_op(inference_data, validation_data, + column_names, key_columns, project, mode, + validation_output, step_name='validation'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--csv-data-for-inference', inference_data, + '--csv-data-to-validate', validation_data, + '--column-names', column_names, + '--key-columns', key_columns, + '--project', project, + '--mode', mode, + '--output', '%s/{{workflow.name}}/validation' % validation_output, + ], + file_outputs={ + 'schema': '/schema.txt', + 'validation': '/output_validation_result.txt', + } + ) + + +def dataflow_tf_transform_op(train_data, evaluation_data, schema, + project, preprocess_mode, preprocess_module, + transform_output, step_name='preprocess'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--train', train_data, + '--eval', evaluation_data, + '--schema', schema, + '--project', project, + '--mode', preprocess_mode, + '--preprocessing-module', preprocess_module, + '--output', '%s/{{workflow.name}}/transformed' % transform_output, + ], + file_outputs={'transformed': '/output.txt'} + ) + + +def tf_train_op(transformed_data_dir, schema, learning_rate: float, hidden_layer_size: int, + steps: int, target: str, preprocess_module, + training_output, step_name='training'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--transformed-data-dir', transformed_data_dir, + '--schema', schema, + '--learning-rate', learning_rate, + '--hidden-layer-size', hidden_layer_size, + '--steps', steps, + '--target', target, + '--preprocessing-module', preprocess_module, + '--job-dir', '%s/{{workflow.name}}/train' % training_output, + ], + file_outputs={'train': '/output.txt'} + ) + + +def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data, schema, + project, analyze_mode, analyze_slice_column, analysis_output, + step_name='analysis'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--model', model, + '--eval', evaluation_data, + '--schema', schema, + '--project', project, + '--mode', analyze_mode, + '--slice-columns', analyze_slice_column, + '--output', '%s/{{workflow.name}}/analysis' % analysis_output, + ], + file_outputs={'analysis': '/output.txt'} + ) + + +def dataflow_tf_predict_op(evaluation_data, schema, target: str, + model: 'TensorFlow model', predict_mode, project, prediction_output, + step_name='prediction'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--data', evaluation_data, + '--schema', schema, + '--target', target, + '--model', model, + '--mode', predict_mode, + '--project', project, + '--output', '%s/{{workflow.name}}/predict' % prediction_output, + ], + file_outputs={'prediction': '/output.txt'} + ) + + +def confusion_matrix_op(predictions, output, step_name='confusion_matrix'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--output', '%s/{{workflow.name}}/confusionmatrix' % output, + '--predictions', predictions, + '--target_lambda', """lambda x: (x['target'] > x['fare'] * 0.2)""", + ]) + + +def roc_op(predictions, output, step_name='roc'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-local-roc:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--output', '%s/{{workflow.name}}/roc' % output, + '--predictions', predictions, + '--target_lambda', """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0""", + ]) + + +def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, pvc_name, step_name='deploy'): + return dsl.ContainerOp( + name=step_name, + image='gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:be19cbc2591a48d2ef5ca715c34ecae8223cf454', + arguments=[ + '--cluster-name', 'tfx-taxi-pipeline-on-prem', + '--model-path', model, + '--server-name', tf_server_name, + '--model-storage-type', 'nfs', + '--pvc-name', pvc_name, + ] + ) + + +@dsl.pipeline( + name='TFX Taxi Cab Classification Pipeline Example', + description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.' +) +def taxi_cab_classification( + pvc_name='pipeline-pvc', + project='tfx-taxi-pipeline-on-prem', + column_names='taxi-cab-classification/column-names.json', + key_columns='trip_start_timestamp', + train='taxi-cab-classification/train.csv', + evaluation='taxi-cab-classification/eval.csv', + mode='local', + preprocess_module='taxi-cab-classification/preprocessing.py', + learning_rate=0.1, + hidden_layer_size=1500, + steps=3000, + analyze_slice_column='trip_start_hour'): + tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}' + validation = dataflow_tf_data_validation_op('/mnt/%s' % train, '/mnt/%s' % evaluation, '/mnt/%s' % column_names, + key_columns, project, mode, '/mnt').add_volume( + k8s_client.V1Volume(name='pipeline-nfs', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource( + claim_name='pipeline-pvc'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + preprocess = dataflow_tf_transform_op('/mnt/%s' % train, '/mnt/%s' % evaluation, validation.outputs['schema'], + project, mode, '/mnt/%s' % preprocess_module, '/mnt').add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size, steps, + 'tips', '/mnt/%s' % preprocess_module, '/mnt').add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + analysis = dataflow_tf_model_analyze_op(training.output, '/mnt/%s' % evaluation, validation.outputs['schema'], + project, mode, analyze_slice_column, '/mnt').add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + prediction = dataflow_tf_predict_op('/mnt/%s' % evaluation, validation.outputs['schema'], 'tips', training.output, + mode, project, '/mnt').add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + cm = confusion_matrix_op(prediction.output, '/mnt').add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + roc = roc_op(prediction.output, '/mnt').add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + deploy = kubeflow_deploy_op(training.output, tf_server_name, pvc_name).add_volume_mount( + k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs')) + + +if __name__ == '__main__': + import kfp.compiler as compiler + + compiler.Compiler().compile(taxi_cab_classification, __file__ + '.tar.gz') diff --git a/test/sample-test/run_test.sh b/test/sample-test/run_test.sh index 53ba79efef1..6d26c2f8337 100755 --- a/test/sample-test/run_test.sh +++ b/test/sample-test/run_test.sh @@ -169,17 +169,17 @@ elif [ "$TEST_NAME" == "tfx" ]; then cd ${BASE_DIR}/samples/tfx if [ -n "${DATAFLOW_TFT_IMAGE}" ];then - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFT_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_PREDICT_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFDV_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFMA_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DNNTRAINER_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DEPLOYER_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" taxi-cab-classification-pipeline.py - sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFT_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_PREDICT_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFDV_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFMA_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DNNTRAINER_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DEPLOYER_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py + sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py fi - dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz + dsl-compile --py taxi-cab-classification-pipeline-gcp.py --output taxi-cab-classification-pipeline.tar.gz cd "${TEST_DIR}" python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"