diff --git a/samples/core/kubeflow_pipeline_using_TFX_OSS_components/KubeFlow Pipeline Using TFX OSS Components.ipynb b/samples/core/kubeflow_pipeline_using_TFX_OSS_components/KubeFlow Pipeline Using TFX OSS Components.ipynb deleted file mode 100644 index ce810c5639b..00000000000 --- a/samples/core/kubeflow_pipeline_using_TFX_OSS_components/KubeFlow Pipeline Using TFX OSS Components.ipynb +++ /dev/null @@ -1,745 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# KubeFlow Pipeline Using TFX OSS Components\n", - "\n", - "In this notebook, we will demo: \n", - "\n", - "* Defining a KubeFlow pipeline with Python DSL\n", - "* Submiting it to Pipelines System\n", - "* Customize a step in the pipeline\n", - "\n", - "We will use a pipeline that includes some TFX OSS components such as [TFDV](https://github.com/tensorflow/data-validation), [TFT](https://github.com/tensorflow/transform), [TFMA](https://github.com/tensorflow/model-analysis)." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Setup" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "# Set your output and project. !!!Must Do before you can proceed!!!\n", - "EXPERIMENT_NAME = 'demo'\n", - "OUTPUT_DIR = 'Your-Gcs-Path' # Such as gs://bucket/objact/path\n", - "PROJECT_NAME = 'Your-Gcp-Project-Name'\n", - "BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n", - "TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME\n", - "TARGET_IMAGE_TWO='gcr.io/%s/pusher_two:dev' % PROJECT_NAME\n", - "TRAIN_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'\n", - "EVAL_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'\n", - "HIDDEN_LAYER_SIZE = '1500'\n", - "STEPS = 3000\n", - "DATAFLOW_TFDV_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "DATAFLOW_TFT_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "DATAFLOW_TFMA_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "DATAFLOW_TF_PREDICT_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "KUBEFLOW_TF_TRAINER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "KUBEFLOW_TF_TRAINER_GPU_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer-gpu:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "KUBEFLOW_DEPLOYER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:0517114dc2b365a4a6d95424af6157ead774eff3'\n", - "DEPLOYER_MODEL = 'notebook_tfx_taxi'\n", - "DEPLOYER_VERSION_DEV = 'dev'\n", - "DEPLOYER_VERSION_PROD = 'prod'\n", - "DEPLOYER_VERSION_PROD_TWO = 'prodtwo'" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "scrolled": false - }, - "outputs": [], - "source": [ - "# Install Pipeline SDK and tensorflow\n", - "!pip3 install kfp --upgrade\n", - "!pip3 install tensorflow==1.8.0" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create an Experiment in the Pipeline System\n", - "\n", - "Pipeline system requires an \"Experiment\" to group pipeline runs. You can create a new experiment, or call client.list_experiments() to get existing ones." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Note that this notebook should be running in JupyterHub in the same cluster as the pipeline system.\n", - "# Otherwise it will fail to talk to the pipeline system.\n", - "import kfp\n", - "from kfp import compiler\n", - "import kfp.dsl as dsl\n", - "import kfp.notebook\n", - "import kfp.gcp as gcp\n", - "\n", - "# If you are using Kubeflow JupyterHub, then no need to set host in Client() constructor.\n", - "# But if you are using your local Jupyter instance, and have a kubectl connection to the cluster,\n", - "# Then do:\n", - "# client = kfp.Client('127.0.0.1:8080/pipeline')\n", - "client = kfp.Client()\n", - "exp = client.create_experiment(name=EXPERIMENT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Test Run a Pipeline" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Download a pipeline package\n", - "!gsutil cp gs://ml-pipeline-playground/coin.tar.gz ." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "run = client.run_pipeline(exp.id, 'coin', 'coin.tar.gz')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Define a Pipeline\n", - "\n", - "Authoring a pipeline is just like authoring a normal Python function. The pipeline function describes the topology of the pipeline. Each step in the pipeline is typically a ContainerOp --- a simple class or function describing how to interact with a docker container image. In the below pipeline, all the container images referenced in the pipeline are already built. The pipeline starts with a TFDV step which is used to infer the schema of the data. Then it uses TFT to transform the data for training. After a single node training step, it analyze the test data predictions and generate a feature slice metrics view using a TFMA component. At last, it deploys the model to TF-Serving inside the same cluster." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Below are a list of helper functions to wrap the components to provide a simpler interface for pipeline function.\n", - "def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):\n", - " return dsl.ContainerOp(\n", - " name = step_name,\n", - " image = DATAFLOW_TFDV_IMAGE,\n", - " arguments = [\n", - " '--csv-data-for-inference', inference_data,\n", - " '--csv-data-to-validate', validation_data,\n", - " '--column-names', column_names,\n", - " '--key-columns', key_columns,\n", - " '--project', project,\n", - " '--mode', mode,\n", - " '--output', validation_output,\n", - " ],\n", - " file_outputs = {\n", - " 'schema': '/schema.txt',\n", - " }\n", - " )\n", - "\n", - "def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):\n", - " return dsl.ContainerOp(\n", - " name = step_name,\n", - " image = DATAFLOW_TFT_IMAGE,\n", - " arguments = [\n", - " '--train', train_data,\n", - " '--eval', evaluation_data,\n", - " '--schema', schema,\n", - " '--project', project,\n", - " '--mode', preprocess_mode,\n", - " '--preprocessing-module', preprocess_module,\n", - " '--output', transform_output,\n", - " ],\n", - " file_outputs = {'transformed': '/output.txt'}\n", - " )\n", - "\n", - "\n", - "def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training', use_gpu=False):\n", - " tf_train_op = dsl.ContainerOp(\n", - " name = step_name,\n", - " image = KUBEFLOW_TF_TRAINER_IMAGE,\n", - " arguments = [\n", - " '--transformed-data-dir', transformed_data_dir,\n", - " '--schema', schema,\n", - " '--learning-rate', learning_rate,\n", - " '--hidden-layer-size', hidden_layer_size,\n", - " '--steps', steps,\n", - " '--target', target,\n", - " '--preprocessing-module', preprocess_module,\n", - " '--job-dir', training_output,\n", - " ],\n", - " file_outputs = {'train': '/output.txt'}\n", - " )\n", - " if use_gpu:\n", - " tf_train_op.image = KUBEFLOW_TF_TRAINER_GPU_IMAGE\n", - " tf_train_op.set_gpu_limit(1)\n", - " \n", - " return tf_train_op\n", - "\n", - "def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):\n", - " return dsl.ContainerOp(\n", - " name = step_name,\n", - " image = DATAFLOW_TFMA_IMAGE,\n", - " arguments = [\n", - " '--model', model,\n", - " '--eval', evaluation_data,\n", - " '--schema', schema,\n", - " '--project', project,\n", - " '--mode', analyze_mode,\n", - " '--slice-columns', analyze_slice_column,\n", - " '--output', analysis_output,\n", - " ],\n", - " file_outputs = {'analysis': '/output.txt'}\n", - " )\n", - "\n", - "\n", - "def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):\n", - " return dsl.ContainerOp(\n", - " name = step_name,\n", - " image = DATAFLOW_TF_PREDICT_IMAGE,\n", - " arguments = [\n", - " '--data', evaluation_data,\n", - " '--schema', schema,\n", - " '--target', target,\n", - " '--model', model,\n", - " '--mode', predict_mode,\n", - " '--project', project,\n", - " '--output', prediction_output,\n", - " ],\n", - " file_outputs = {'prediction': '/output.txt'}\n", - " )\n", - "\n", - "def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):\n", - " return dsl.ContainerOp(\n", - " name = step_name,\n", - " image = KUBEFLOW_DEPLOYER_IMAGE,\n", - " arguments = [\n", - " '--model-export-path', '%s/export/export' % model,\n", - " '--server-name', tf_server_name\n", - " ]\n", - " )\n", - "\n", - "\n", - "# The pipeline definition\n", - "@dsl.pipeline(\n", - " name='TFX Taxi Cab Classification Pipeline Example',\n", - " description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'\n", - ")\n", - "def taxi_cab_classification(\n", - " output,\n", - " project,\n", - " column_names='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json',\n", - " key_columns='trip_start_timestamp',\n", - " train=TRAIN_DATA,\n", - " evaluation=EVAL_DATA,\n", - " validation_mode='local',\n", - " preprocess_mode='local',\n", - " preprocess_module: dsl.PipelineParam='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',\n", - " target='tips',\n", - " learning_rate=0.1,\n", - " hidden_layer_size=HIDDEN_LAYER_SIZE,\n", - " steps=STEPS,\n", - " predict_mode='local',\n", - " analyze_mode='local',\n", - " analyze_slice_column='trip_start_hour'):\n", - "\n", - " # set the flag to use GPU trainer\n", - " use_gpu = False\n", - " \n", - " validation_output = '%s/{{workflow.name}}/validation' % output\n", - " transform_output = '%s/{{workflow.name}}/transformed' % output\n", - " training_output = '%s/{{workflow.name}}/train' % output\n", - " analysis_output = '%s/{{workflow.name}}/analysis' % output\n", - " prediction_output = '%s/{{workflow.name}}/predict' % output\n", - " tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}'\n", - "\n", - " validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, validation_mode, validation_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - "\n", - " preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'], project, preprocess_mode, preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output, use_gpu=use_gpu).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " analysis = dataflow_tf_model_analyze_op(training.output, evaluation, validation.outputs['schema'], project, analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], target, training.output, predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " deploy = kubeflow_deploy_op(training.output, tf_server_name).apply(gcp.use_gcp_secret('user-gcp-sa'))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Submit the run" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Compile it into a tar package.\n", - "compiler.Compiler().compile(taxi_cab_classification, 'tfx.zip')\n", - "\n", - "# Submit a run.\n", - "run = client.run_pipeline(exp.id, 'tfx', 'tfx.zip',\n", - " params={'output': OUTPUT_DIR,\n", - " 'project': PROJECT_NAME})" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Customize a step in the above pipeline\n", - "\n", - "Let's say I got the pipeline source code from github, and I want to modify the pipeline a little bit by swapping the last deployer step with my own deployer. Instead of tf-serving deployer, I want to deploy it to Cloud ML Engine service." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create and test a python function for the new deployer" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# in order to run it locally we need a python package\n", - "!pip3 install google-api-python-client" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "@dsl.python_component(\n", - " name='cmle_deployer',\n", - " description='deploys a model to GCP CMLE',\n", - " base_image=BASE_IMAGE\n", - ")\n", - "def deploy_model(model_name: str, version_name: str, model_path: str, gcp_project: str, runtime: str):\n", - "\n", - " from googleapiclient import discovery\n", - " from tensorflow.python.lib.io import file_io\n", - " import os\n", - " \n", - " model_path = file_io.get_matching_files(os.path.join(model_path, 'export', 'export', '*'))[0]\n", - " api = discovery.build('ml', 'v1')\n", - " body = {'name': model_name}\n", - " parent = 'projects/%s' % gcp_project\n", - " try:\n", - " api.projects().models().create(body=body, parent=parent).execute()\n", - " except Exception as e:\n", - " # If the error is to create an already existing model. Ignore it.\n", - " print(str(e))\n", - " pass\n", - "\n", - " import time\n", - "\n", - " body = {\n", - " 'name': version_name,\n", - " 'deployment_uri': model_path,\n", - " 'runtime_version': runtime\n", - " }\n", - "\n", - " full_mode_name = 'projects/%s/models/%s' % (gcp_project, model_name)\n", - " response = api.projects().models().versions().create(body=body, parent=full_mode_name).execute()\n", - " \n", - " while True:\n", - " response = api.projects().operations().get(name=response['name']).execute()\n", - " if 'done' not in response or response['done'] is not True:\n", - " time.sleep(10)\n", - " print('still deploying...')\n", - " else:\n", - " if 'error' in response:\n", - " print(response['error'])\n", - " else:\n", - " print('Done.')\n", - " break" - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "metadata": { - "scrolled": true - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "still deploying...\n", - "Done.\n" - ] - } - ], - "source": [ - "# Test the function and make sure it works.\n", - "path = 'gs://ml-pipeline-playground/sampledata/taxi/train'\n", - "deploy_model(DEPLOYER_MODEL, DEPLOYER_VERSION_DEV, path, PROJECT_NAME, '1.9')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Build a Pipeline Step With the Above Function(Note: run either of the two options below)\n", - "#### Option One: Specify the dependency directly\n", - "Now that we've tested the function locally, we want to build a component that can run as a step in the pipeline. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# The return value \"DeployerOp\" represents a step that can be used directly in a pipeline function\n", - "DeployerOp = compiler.build_python_component(\n", - " component_func=deploy_model,\n", - " staging_gcs_path=OUTPUT_DIR,\n", - " dependency=[kfp.compiler.VersionedDependency(name='google-api-python-client', version='1.7.0')],\n", - " base_image='tensorflow/tensorflow:1.12.0-py3',\n", - " target_image=TARGET_IMAGE)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Option Two: build a base docker container image with both tensorflow and google api client packages" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%%docker {BASE_IMAGE} {OUTPUT_DIR}\n", - "FROM tensorflow/tensorflow:1.10.0-py3\n", - "RUN pip3 install google-api-python-client" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Once the base docker container image is built, we can build a \"target\" container image that is base_image plus the python function as entry point. The target container image can be used as a step in a pipeline." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# The return value \"DeployerOp\" represents a step that can be used directly in a pipeline function\n", - "DeployerOp = compiler.build_python_component(\n", - " component_func=deploy_model,\n", - " staging_gcs_path=OUTPUT_DIR,\n", - " target_image=TARGET_IMAGE)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Modify the pipeline with the new deployer" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# My New Pipeline. It's almost the same as the original one with the last step deployer replaced.\n", - "@dsl.pipeline(\n", - " name='TFX Taxi Cab Classification Pipeline Example',\n", - " description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'\n", - ")\n", - "def my_taxi_cab_classification(\n", - " output,\n", - " project,\n", - " model,\n", - " version,\n", - " column_names='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json',\n", - " key_columns='trip_start_timestamp',\n", - " train=TRAIN_DATA,\n", - " evaluation=EVAL_DATA,\n", - " validation_mode='local',\n", - " preprocess_mode='local',\n", - " preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',\n", - " target='tips',\n", - " learning_rate=0.1,\n", - " hidden_layer_size=HIDDEN_LAYER_SIZE,\n", - " steps=STEPS,\n", - " predict_mode='local',\n", - " analyze_mode='local',\n", - " analyze_slice_column='trip_start_hour'):\n", - " \n", - " \n", - " validation_output = '%s/{{workflow.name}}/validation' % output\n", - " transform_output = '%s/{{workflow.name}}/transformed' % output\n", - " training_output = '%s/{{workflow.name}}/train' % output\n", - " analysis_output = '%s/{{workflow.name}}/analysis' % output\n", - " prediction_output = '%s/{{workflow.name}}/predict' % output\n", - "\n", - " validation = dataflow_tf_data_validation_op(\n", - " train, evaluation, column_names, key_columns, project,\n", - " validation_mode, validation_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " preprocess = dataflow_tf_transform_op(\n", - " train, evaluation, validation.outputs['schema'], project, preprocess_mode,\n", - " preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " training = tf_train_op(\n", - " preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size,\n", - " steps, target, preprocess_module, training_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " analysis = dataflow_tf_model_analyze_op(\n", - " training.output, evaluation, validation.outputs['schema'], project,\n", - " analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " prediction = dataflow_tf_predict_op(\n", - " evaluation, validation.outputs['schema'], target, training.output,\n", - " predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " \n", - " # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n", - " deploy = DeployerOp(\n", - " gcp_project=project, model_name=model, version_name=version, runtime='1.9',\n", - " model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Submit a new job" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx.zip')\n", - "\n", - "run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.zip',\n", - " params={'output': OUTPUT_DIR,\n", - " 'project': PROJECT_NAME,\n", - " 'model': DEPLOYER_MODEL,\n", - " 'version': DEPLOYER_VERSION_PROD})\n", - "\n", - "result = client.wait_for_run_completion(run.id, timeout=600)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Customize a step in Python2\n", - "Let's reuse the deploy_model function defined above. However, this time we will use python2 instead of the default python3." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# The return value \"DeployerOp\" represents a step that can be used directly in a pipeline function\n", - "#TODO: demonstrate the python2 support in another sample.\n", - "DeployerOp = compiler.build_python_component(\n", - " component_func=deploy_model,\n", - " staging_gcs_path=OUTPUT_DIR,\n", - " dependency=[kfp.compiler.VersionedDependency(name='google-api-python-client', version='1.7.0')],\n", - " base_image='tensorflow/tensorflow:1.12.0',\n", - " target_image=TARGET_IMAGE_TWO,\n", - " python_version='python2')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Modify the pipeline with the new deployer" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# My New Pipeline. It's almost the same as the original one with the last step deployer replaced.\n", - "@dsl.pipeline(\n", - " name='TFX Taxi Cab Classification Pipeline Example',\n", - " description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'\n", - ")\n", - "def my_taxi_cab_classification(\n", - " output,\n", - " project,\n", - " model,\n", - " version,\n", - " column_names='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json',\n", - " key_columns='trip_start_timestamp',\n", - " train=TRAIN_DATA,\n", - " evaluation=EVAL_DATA,\n", - " validation_mode='local',\n", - " preprocess_mode='local',\n", - " preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',\n", - " target='tips',\n", - " learning_rate=0.1,\n", - " hidden_layer_size=HIDDEN_LAYER_SIZE,\n", - " steps=STEPS,\n", - " predict_mode='local',\n", - " analyze_mode='local',\n", - " analyze_slice_column='trip_start_hour'):\n", - " \n", - " \n", - " validation_output = '%s/{{workflow.name}}/validation' % output\n", - " transform_output = '%s/{{workflow.name}}/transformed' % output\n", - " training_output = '%s/{{workflow.name}}/train' % output\n", - " analysis_output = '%s/{{workflow.name}}/analysis' % output\n", - " prediction_output = '%s/{{workflow.name}}/predict' % output\n", - "\n", - " validation = dataflow_tf_data_validation_op(\n", - " train, evaluation, column_names, key_columns, project,\n", - " validation_mode, validation_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " preprocess = dataflow_tf_transform_op(\n", - " train, evaluation, validation.outputs['schema'], project, preprocess_mode,\n", - " preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " training = tf_train_op(\n", - " preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size,\n", - " steps, target, preprocess_module, training_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " analysis = dataflow_tf_model_analyze_op(\n", - " training.output, evaluation, validation.outputs['schema'], project,\n", - " analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " prediction = dataflow_tf_predict_op(\n", - " evaluation, validation.outputs['schema'], target, training.output,\n", - " predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", - " \n", - " # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n", - " deploy = DeployerOp(\n", - " gcp_project=project, model_name=model, version_name=version, runtime='1.9',\n", - " model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Submit a new job" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx-two.tar.gz')\n", - "\n", - "run = client.run_pipeline(exp.id, 'my-tfx-two', 'my-tfx-two.tar.gz',\n", - " params={'output': OUTPUT_DIR,\n", - " 'project': PROJECT_NAME,\n", - " 'model': DEPLOYER_MODEL,\n", - " 'version': DEPLOYER_VERSION_PROD_TWO})\n", - "\n", - "result = client.wait_for_run_completion(run.id, timeout=600)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Clean up" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Activated service account credentials for: [kubeflow3-user@bradley-playground.iam.gserviceaccount.com]\n", - "Deleting version [prod]......done. \n", - "Deleting version [dev]......done. \n", - "Deleting model [notebook_tfx_taxi]...done. \n" - ] - } - ], - "source": [ - "# the step is only needed if you are using an in-cluster JupyterHub instance.\n", - "!gcloud auth activate-service-account --key-file /var/run/secrets/sa/user-gcp-sa.json\n", - "\n", - "\n", - "!gcloud ai-platform versions delete $DEPLOYER_VERSION_PROD --model $DEPLOYER_MODEL -q\n", - "!gcloud ai-platform versions delete $DEPLOYER_VERSION_PROD_TWO --model $DEPLOYER_MODEL -q\n", - "!gcloud ai-platform versions delete $DEPLOYER_VERSION_DEV --model $DEPLOYER_MODEL -q\n", - "!gcloud ai-platform models delete $DEPLOYER_MODEL -q" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.5" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/test/sample-test/run_test.sh b/test/sample-test/run_test.sh index bf5c10a932e..9e292fd8956 100755 --- a/test/sample-test/run_test.sh +++ b/test/sample-test/run_test.sh @@ -247,43 +247,6 @@ elif [[ "${TEST_NAME}" == "parallel_join" ]]; then elif [[ "${TEST_NAME}" == "recursion" ]]; then dsl-compile --py "${TEST_NAME}.py" --output "${TEST_NAME}.yaml" check_result ${TEST_NAME} -elif [[ "${TEST_NAME}" == "kubeflow_pipeline_using_TFX_OSS_components" ]]; then - # CMLE model name format: A name should start with a letter and contain only - # letters, numbers and underscores. - DEPLOYER_MODEL=`cat /proc/sys/kernel/random/uuid` - DEPLOYER_MODEL=Notebook_tfx_taxi_`echo ${DEPLOYER_MODEL//-/_}` - - export LC_ALL=C.UTF-8 - export LANG=C.UTF-8 - if [[ -n "${DATAFLOW_TFT_IMAGE}" ]]; then - papermill --prepare-only -p EXPERIMENT_NAME "${TEST_NAME}-test" -p OUTPUT_DIR \ - ${RESULTS_GCS_DIR} -p PROJECT_NAME ml-pipeline-test \ - -p BASE_IMAGE ${TARGET_IMAGE_PREFIX}pusherbase:dev -p TARGET_IMAGE \ - ${TARGET_IMAGE_PREFIX}pusher:dev -p TARGET_IMAGE_TWO \ - ${TARGET_IMAGE_PREFIX}pusher_two:dev \ - -p KFP_PACKAGE /tmp/kfp.tar.gz -p DEPLOYER_MODEL ${DEPLOYER_MODEL} \ - -p DATAFLOW_TFDV_IMAGE ${DATAFLOW_TFDV_IMAGE} -p DATAFLOW_TFT_IMAGE \ - ${DATAFLOW_TFT_IMAGE} -p DATAFLOW_TFMA_IMAGE ${DATAFLOW_TFMA_IMAGE} -p \ - DATAFLOW_TF_PREDICT_IMAGE ${DATAFLOW_PREDICT_IMAGE} \ - -p KUBEFLOW_TF_TRAINER_IMAGE ${KUBEFLOW_DNNTRAINER_IMAGE} -p \ - KUBEFLOW_DEPLOYER_IMAGE ${KUBEFLOW_DEPLOYER_IMAGE} \ - -p TRAIN_DATA gs://ml-pipeline-dataset/sample-test/taxi-cab-classification/train50.csv \ - -p EVAL_DATA gs://ml-pipeline-dataset/sample-test/taxi-cab-classification/eval20.csv \ - -p HIDDEN_LAYER_SIZE 10 -p STEPS 50 \ - "KubeFlow Pipeline Using TFX OSS Components.ipynb" "${TEST_NAME}.ipynb" - else - papermill --prepare-only -p EXPERIMENT_NAME "${TEST_NAME}-test" -p \ - OUTPUT_DIR ${RESULTS_GCS_DIR} -p PROJECT_NAME ml-pipeline-test \ - -p BASE_IMAGE ${TARGET_IMAGE_PREFIX}pusherbase:dev -p TARGET_IMAGE \ - ${TARGET_IMAGE_PREFIX}pusher:dev -p TARGET_IMAGE_TWO \ - ${TARGET_IMAGE_PREFIX}pusher_two:dev \ - -p KFP_PACKAGE /tmp/kfp.tar.gz -p DEPLOYER_MODEL ${DEPLOYER_MODEL} \ - -p TRAIN_DATA gs://ml-pipeline-dataset/sample-test/taxi-cab-classification/train50.csv \ - -p EVAL_DATA gs://ml-pipeline-dataset/sample-test/taxi-cab-classification/eval20.csv \ - -p HIDDEN_LAYER_SIZE 10 -p STEPS 50 \ - "KubeFlow Pipeline Using TFX OSS Components.ipynb" "${TEST_NAME}.ipynb" - fi - check_notebook_result ${TEST_NAME} elif [[ "${TEST_NAME}" == "lightweight_component" ]]; then export LC_ALL=C.UTF-8 export LANG=C.UTF-8 diff --git a/test/sample_test.yaml b/test/sample_test.yaml index 5ce101b4c02..bd1671787f0 100644 --- a/test/sample_test.yaml +++ b/test/sample_test.yaml @@ -69,7 +69,6 @@ spec: withItems: - tfx_cab_classification - xgboost_training_cm - - kubeflow_pipeline_using_TFX_OSS_components - lightweight_component - dsl_static_type_checking # Build and push image