diff --git a/samples/tfx-oss/README.md b/samples/tfx-oss/README.md index 6c03d65cf13..acc3f74360e 100644 --- a/samples/tfx-oss/README.md +++ b/samples/tfx-oss/README.md @@ -9,6 +9,8 @@ This pipeline demonstrates the TFX capablities at scale. The pipeline uses a pub ## Setup +Enable DataFlow API for your GKE cluster: + Create a local Python 3.5 conda environment ``` conda create -n tfx-kfp pip python=3.5.3 @@ -29,7 +31,13 @@ git clone https://github.com/tensorflow/tfx Upload the utility code to your storage bucket. You can modify this code if needed for a different dataset. ``` -gsutil cp tfx/examples/chicago_taxi_pipeline/taxi_utils.py gs://my-bucket/ +gsutil cp tfx/examples/chicago_taxi_pipeline/taxi_utils.py gs://my-bucket// +``` + +If gsutil does not work, try `tensorflow.gfile`: +``` +from tensorflow import gfile +gfile.Copy('tfx/examples/chicago_taxi_pipeline/taxi_utils.py', 'gs:////taxi_utils.py') ``` ## Configure the TFX Pipeline @@ -39,9 +47,9 @@ Modify the pipeline configuration file at tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_large.py ``` Configure -- GCS storage bucket name (replace "my-bucket") -- GCP project ID (replace "my-gcp-project") -- Make sure the path to the taxi_utils.py is correct +- Set `_input_bucket` to the GCS directory where you've copied taxi_utils.py. I.e. gs://// +- Set `_output_bucket` to the GCS directory where you've want the results to be written +- Set GCP project ID (replace my-gcp-project). Note that it should be project ID, not project name. - The original BigQuery dataset has 100M rows, which can take time to process. Modify the selection criteria (% of records) to run a sample test. ## Compile and run the pipeline diff --git a/samples/tfx-oss/TFX Example.ipynb b/samples/tfx-oss/TFX Example.ipynb index b5da68d1e47..fd7b488469b 100644 --- a/samples/tfx-oss/TFX Example.ipynb +++ b/samples/tfx-oss/TFX Example.ipynb @@ -6,6 +6,8 @@ "source": [ "# TFX on KubeFlow Pipelines Example\n", "\n", + "This notebook should be run inside a KF Pipelines cluster.\n", + "\n", "### Install TFX and KFP packages" ] }, @@ -19,6 +21,14 @@ "!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.10/kfp.tar.gz --upgrade\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Enable DataFlow API for your GKE cluster\n", + "\n" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -41,8 +51,9 @@ "metadata": {}, "outputs": [], "source": [ - "# copy the trainer code to a storage bucket \n", - "!gsutil cp tfx/examples/chicago_taxi_pipeline/taxi_utils.py gs:///" + "# copy the trainer code to a storage bucket as the TFX pipeline will need that code file in GCS\n", + "from tensorflow import gfile\n", + "gfile.Copy('tfx/examples/chicago_taxi_pipeline/taxi_utils.py', 'gs:////taxi_utils.py')" ] }, { @@ -57,9 +68,9 @@ "```\n", "\n", "Configure:\n", - "- GCS storage bucket name (replace my-bucket)\n", - "- GCP project ID (replace my-gcp-project)\n", - "- Make sure the path to the taxi_utils.py is correct\n", + "- Set `_input_bucket` to the GCS directory where you've copied taxi_utils.py. I.e. gs:////\n", + "- Set `_output_bucket` to the GCS directory where you've want the results to be written\n", + "- Set GCP project ID (replace my-gcp-project). Note that it should be project ID, not project name.\n", "\n", "The dataset in BigQuery has 100M rows, you can change the query parameters in WHERE clause to limit the number of rows used.\n" ] @@ -89,12 +100,17 @@ "# Get or create a new experiment\n", "import kfp\n", "client = kfp.Client()\n", - "experiment = client.create_experiment(\"TFX Examples\")\n", - "pipeline_filename = \"chicago_taxi_pipeline_kubeflow.tar.gz\"\n", + "experiment_name='TFX Examples'\n", + "try:\n", + " experiment_id = client.get_experiment(experiment_name=experiment_name).id\n", + "except:\n", + " experiment_id = client.create_experiment(experiment_name).id\n", + "\n", + "pipeline_filename = 'chicago_taxi_pipeline_kubeflow.tar.gz'\n", "\n", "#Submit a pipeline run\n", "run_name = 'Run 1'\n", - "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, {})\n" + "run_result = client.run_pipeline(experiment_id, run_name, pipeline_filename, {})\n" ] } ],