From 51b16a9e3745207f0aebbbaeb877e11c24639f93 Mon Sep 17 00:00:00 2001 From: Makoto Uchida Date: Wed, 23 Oct 2019 12:59:07 -0700 Subject: [PATCH 1/5] Update samples/core/ai_platform pipeline to follow data dependency --- samples/core/ai_platform/ai_platform.ipynb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/samples/core/ai_platform/ai_platform.ipynb b/samples/core/ai_platform/ai_platform.ipynb index 7ac1119ee5d..00b38d575fe 100644 --- a/samples/core/ai_platform/ai_platform.ipynb +++ b/samples/core/ai_platform/ai_platform.ipynb @@ -95,10 +95,6 @@ "TRAINER_OUTPUT_GCS_PATH = output + '/train/output/' + str(int(time.time())) + '/'\n", "DATA_GCS_PATH = output + '/reports.csv'\n", "PYTHON_MODULE = 'trainer.task'\n", - "TRAINER_ARGS = json.dumps([\n", - " '--data-file-url', DATA_GCS_PATH,\n", - " '--job-dir', output\n", - "])\n", "PIPELINE_NAME = 'Chicago Crime Prediction'\n", "PIPELINE_FILENAME_PREFIX = 'chicago'\n", "PIPELINE_DESCRIPTION = ''\n", @@ -168,7 +164,7 @@ " region,\n", " python_module,\n", " runtime_version):\n", - " \n", + "\n", " return mlengine_train_op(\n", " project_id=project_id, \n", " python_module=python_module,\n", @@ -242,19 +238,23 @@ " runtime_version=RUNTIME_VERSION,\n", " package_uris=PACKAGE_URIS,\n", " trainer_output_gcs_path=TRAINER_OUTPUT_GCS_PATH,\n", - " trainer_args=TRAINER_ARGS,\n", "): \n", " download_task = download(project_id,\n", " data_gcs_path)\n", "\n", " train_task = train(project_id,\n", - " trainer_args,\n", + " json.dumps(\n", + " ['--data-file-url',\n", + " '%s' % download_task.outputs['output_gcs_path'],\n", + " '--job-dir',\n", + " output]\n", + " ),\n", " package_uris,\n", " trainer_output_gcs_path,\n", " gcs_working_dir,\n", " region,\n", " python_module,\n", - " runtime_version).after(download_task)\n", + " runtime_version)\n", " \n", " deploy_task = deploy(project_id,\n", " train_task.outputs['job_dir'],\n", @@ -334,7 +334,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.4" + "version": "3.6.7" }, "pycharm": { "stem_cell": { From d178be55b8361ae80819a14f366e53161e53bcb3 Mon Sep 17 00:00:00 2001 From: Makoto Uchida Date: Fri, 15 Nov 2019 14:40:16 -0800 Subject: [PATCH 2/5] Modernize samples/core/tfx-oss demonstration --- samples/core/tfx-oss/TFX Example.ipynb | 263 +++---------------------- 1 file changed, 24 insertions(+), 239 deletions(-) diff --git a/samples/core/tfx-oss/TFX Example.ipynb b/samples/core/tfx-oss/TFX Example.ipynb index 1a377aa704a..67e487363d9 100644 --- a/samples/core/tfx-oss/TFX Example.ipynb +++ b/samples/core/tfx-oss/TFX Example.ipynb @@ -14,11 +14,13 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ - "!pip3 install 'tfx==0.14.0' --upgrade\n", - "!python3 -m pip install 'kfp>=0.1.31' --quiet\n" + "!pip3 install 'tfx==0.15.0' --upgrade\n", + "!python3 -m pip install 'kfp>=0.1.35' --quiet" ] }, { @@ -40,6 +42,7 @@ "cell_type": "code", "execution_count": null, "metadata": { + "scrolled": true, "tags": [ "parameters" ] @@ -59,11 +62,13 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "# copy the trainer code to a storage bucket as the TFX pipeline will need that code file in GCS\n", - "from tensorflow import gfile\n", + "from tensorflow.compat.v1 import gfile\n", "gfile.Copy('utils/taxi_utils.py', _input_bucket + '/taxi_utils.py')" ] }, @@ -75,7 +80,7 @@ "\n", "Reload this cell by running the load command to get the pipeline configuration file\n", "```\n", - "%load tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow.py\n", + "%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py\n", "```\n", "\n", "Configure:\n", @@ -89,179 +94,12 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ - "\"\"\"Chicago Taxi example using TFX DSL on Kubeflow.\"\"\"\n", - "\n", - "from __future__ import absolute_import\n", - "from __future__ import division\n", - "from __future__ import print_function\n", - "\n", - "import os\n", - "from tfx.components.evaluator.component import Evaluator\n", - "from tfx.components.example_gen.big_query_example_gen.component import BigQueryExampleGen\n", - "from tfx.components.example_validator.component import ExampleValidator\n", - "from tfx.components.model_validator.component import ModelValidator\n", - "from tfx.components.pusher.component import Pusher\n", - "from tfx.components.schema_gen.component import SchemaGen\n", - "from tfx.components.statistics_gen.component import StatisticsGen\n", - "from tfx.components.trainer.component import Trainer\n", - "from tfx.components.transform.component import Transform\n", - "from tfx.orchestration.kubeflow.runner import KubeflowDagRunner\n", - "from tfx.orchestration.pipeline import PipelineDecorator\n", - "from tfx.proto import evaluator_pb2\n", - "from tfx.proto import pusher_pb2\n", - "from tfx.proto import trainer_pb2\n", - "\n", - "# Python module file to inject customized logic into the TFX components. The\n", - "# Transform and Trainer both require user-defined functions to run successfully.\n", - "# Copy this from the current directory to a GCS bucket and update the location\n", - "# below.\n", - "_taxi_utils = os.path.join(_input_bucket, 'taxi_utils.py')\n", - "\n", - "# Path which can be listened to by the model server. Pusher will output the\n", - "# trained model here.\n", - "_serving_model_dir = os.path.join(_output_bucket, 'serving_model/taxi_bigquery')\n", - "\n", - "# Region to use for Dataflow jobs and CMLE training.\n", - "# Dataflow: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\n", - "# CMLE: https://cloud.google.com/ml-engine/docs/tensorflow/regions\n", - "_gcp_region = 'us-central1'\n", - "\n", - "# A dict which contains the training job parameters to be passed to Google\n", - "# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n", - "# Engine, refer to\n", - "# https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job\n", - "_cmle_training_args = {\n", - " 'pythonModule': None, # Will be populated by TFX\n", - " 'args': None, # Will be populated by TFX\n", - " 'region': _gcp_region,\n", - " 'jobDir': os.path.join(_output_bucket, 'tmp'),\n", - " 'runtimeVersion': '1.12',\n", - " 'pythonVersion': '2.7',\n", - " 'project': _project_id,\n", - "}\n", - "\n", - "# A dict which contains the serving job parameters to be passed to Google\n", - "# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n", - "# Engine, refer to\n", - "# https://cloud.google.com/ml-engine/reference/rest/v1/projects.models\n", - "_cmle_serving_args = {\n", - " 'model_name': 'chicago_taxi',\n", - " 'project_id': _project_id,\n", - " 'runtime_version': '1.12',\n", - "}\n", - "\n", - "# The rate at which to sample rows from the Chicago Taxi dataset using BigQuery.\n", - "# The full taxi dataset is > 120M record. In the interest of resource\n", - "# savings and time, we've set the default for this example to be much smaller.\n", - "# Feel free to crank it up and process the full dataset!\n", - "_query_sample_rate = 0.001 # Generate a 0.1% random sample.\n", - "\n", - "\n", - "# TODO(zhitaoli): Remove PipelineDecorator after 0.13.0.\n", - "@PipelineDecorator(\n", - " pipeline_name='chicago_taxi_pipeline_kubeflow',\n", - " log_root='/var/tmp/tfx/logs',\n", - " pipeline_root=_pipeline_root,\n", - " additional_pipeline_args={\n", - " 'beam_pipeline_args': [\n", - " '--runner=DataflowRunner',\n", - " '--experiments=shuffle_mode=auto',\n", - " '--project=' + _project_id,\n", - " '--temp_location=' + os.path.join(_output_bucket, 'tmp'),\n", - " '--region=' + _gcp_region,\n", - " ],\n", - " # Optional args:\n", - " # 'tfx_image': custom docker image to use for components. This is needed\n", - " # if TFX package is not installed from an RC or released version.\n", - " })\n", - "def _create_pipeline():\n", - " \"\"\"Implements the chicago taxi pipeline with TFX.\"\"\"\n", - "\n", - " query = \"\"\"\n", - " SELECT\n", - " pickup_community_area,\n", - " fare,\n", - " EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,\n", - " EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,\n", - " EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,\n", - " UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,\n", - " pickup_latitude,\n", - " pickup_longitude,\n", - " dropoff_latitude,\n", - " dropoff_longitude,\n", - " trip_miles,\n", - " pickup_census_tract,\n", - " dropoff_census_tract,\n", - " payment_type,\n", - " company,\n", - " trip_seconds,\n", - " dropoff_community_area,\n", - " tips\n", - " FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`\n", - " WHERE RAND() < {}\"\"\".format(_query_sample_rate)\n", - "\n", - " # Brings data into the pipeline or otherwise joins/converts training data.\n", - " example_gen = BigQueryExampleGen(query=query)\n", - "\n", - " # Computes statistics over data for visualization and example validation.\n", - " statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)\n", - "\n", - " # Generates schema based on statistics files.\n", - " infer_schema = SchemaGen(stats=statistics_gen.outputs.output)\n", - "\n", - " # Performs anomaly detection based on statistics and data schema.\n", - " validate_stats = ExampleValidator(\n", - " stats=statistics_gen.outputs.output, schema=infer_schema.outputs.output)\n", - "\n", - " # Performs transformations and feature engineering in training and serving.\n", - " transform = Transform(\n", - " input_data=example_gen.outputs.examples,\n", - " schema=infer_schema.outputs.output,\n", - " module_file=_taxi_utils)\n", - "\n", - " # Uses user-provided Python function that implements a model using TF-Learn.\n", - " trainer = Trainer(\n", - " module_file=_taxi_utils,\n", - " transformed_examples=transform.outputs.transformed_examples,\n", - " schema=infer_schema.outputs.output,\n", - " transform_output=transform.outputs.transform_output,\n", - " train_args=trainer_pb2.TrainArgs(num_steps=10000),\n", - " eval_args=trainer_pb2.EvalArgs(num_steps=5000),\n", - " custom_config={'cmle_training_args': _cmle_training_args})\n", - "\n", - " # Uses TFMA to compute a evaluation statistics over features of a model.\n", - " model_analyzer = Evaluator(\n", - " examples=example_gen.outputs.examples,\n", - " model_exports=trainer.outputs.output,\n", - " feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n", - " evaluator_pb2.SingleSlicingSpec(\n", - " column_for_slicing=['trip_start_hour'])\n", - " ]))\n", - "\n", - " # Performs quality validation of a candidate model (compared to a baseline).\n", - " model_validator = ModelValidator(\n", - " examples=example_gen.outputs.examples, model=trainer.outputs.output)\n", - "\n", - " # Checks whether the model passed the validation steps and pushes the model\n", - " # to a file destination if check passed.\n", - " pusher = Pusher(\n", - " model_export=trainer.outputs.output,\n", - " model_blessing=model_validator.outputs.blessing,\n", - " custom_config={'cmle_serving_args': _cmle_serving_args},\n", - " push_destination=pusher_pb2.PushDestination(\n", - " filesystem=pusher_pb2.PushDestination.Filesystem(\n", - " base_directory=_serving_model_dir)))\n", - "\n", - " return [\n", - " example_gen, statistics_gen, infer_schema, validate_stats, transform,\n", - " trainer, model_analyzer, model_validator, pusher\n", - " ]\n", - "\n", - "\n", - "pipeline = KubeflowDagRunner().run(_create_pipeline())" + "%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py" ] }, { @@ -274,69 +112,16 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "import kfp\n", - "run_result = kfp.Client().create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Connect to the ML Metadata Store" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!pip3 install ml_metadata" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from ml_metadata.metadata_store import metadata_store\n", - "from ml_metadata.proto import metadata_store_pb2\n", - "import os\n", - "\n", - "connection_config = metadata_store_pb2.ConnectionConfig()\n", - "connection_config.mysql.host = os.getenv('MYSQL_SERVICE_HOST')\n", - "connection_config.mysql.port = int(os.getenv('MYSQL_SERVICE_PORT'))\n", - "connection_config.mysql.database = 'mlmetadata'\n", - "connection_config.mysql.user = 'root'\n", - "store = metadata_store.MetadataStore(connection_config)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get all output artifacts\n", - "store.get_artifacts()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get a specific artifact type\n", - "\n", - "# TFX types \n", - "# types = ['ModelExportPath', 'ExamplesPath', 'ModelBlessingPath', 'ModelPushPath', 'TransformPath', 'SchemaPath']\n", "\n", - "store.get_artifacts_by_type('ExamplesPath')" + "run_result = kfp.Client(\n", + " host=None # replace with Kubeflow Pipelines endpoint if it this is run outside of the Kubeflow cluster.\n", + ").create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" ] } ], @@ -356,15 +141,15 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.5.6" }, "pycharm": { "stem_cell": { "cell_type": "raw", - "source": [], "metadata": { "collapsed": false - } + }, + "source": [] } } }, From effcfa88bfc72e508359f3ea5051c68fb035b9d6 Mon Sep 17 00:00:00 2001 From: Makoto Uchida Date: Fri, 15 Nov 2019 18:17:30 -0800 Subject: [PATCH 3/5] Revert "Modernize samples/core/tfx-oss demonstration" This reverts commit d178be55b8361ae80819a14f366e53161e53bcb3. --- samples/core/tfx-oss/TFX Example.ipynb | 263 ++++++++++++++++++++++--- 1 file changed, 239 insertions(+), 24 deletions(-) diff --git a/samples/core/tfx-oss/TFX Example.ipynb b/samples/core/tfx-oss/TFX Example.ipynb index 67e487363d9..1a377aa704a 100644 --- a/samples/core/tfx-oss/TFX Example.ipynb +++ b/samples/core/tfx-oss/TFX Example.ipynb @@ -14,13 +14,11 @@ { "cell_type": "code", "execution_count": null, - "metadata": { - "scrolled": true - }, + "metadata": {}, "outputs": [], "source": [ - "!pip3 install 'tfx==0.15.0' --upgrade\n", - "!python3 -m pip install 'kfp>=0.1.35' --quiet" + "!pip3 install 'tfx==0.14.0' --upgrade\n", + "!python3 -m pip install 'kfp>=0.1.31' --quiet\n" ] }, { @@ -42,7 +40,6 @@ "cell_type": "code", "execution_count": null, "metadata": { - "scrolled": true, "tags": [ "parameters" ] @@ -62,13 +59,11 @@ { "cell_type": "code", "execution_count": null, - "metadata": { - "scrolled": true - }, + "metadata": {}, "outputs": [], "source": [ "# copy the trainer code to a storage bucket as the TFX pipeline will need that code file in GCS\n", - "from tensorflow.compat.v1 import gfile\n", + "from tensorflow import gfile\n", "gfile.Copy('utils/taxi_utils.py', _input_bucket + '/taxi_utils.py')" ] }, @@ -80,7 +75,7 @@ "\n", "Reload this cell by running the load command to get the pipeline configuration file\n", "```\n", - "%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py\n", + "%load tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow.py\n", "```\n", "\n", "Configure:\n", @@ -94,12 +89,179 @@ { "cell_type": "code", "execution_count": null, - "metadata": { - "scrolled": true - }, + "metadata": {}, "outputs": [], "source": [ - "%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py" + "\"\"\"Chicago Taxi example using TFX DSL on Kubeflow.\"\"\"\n", + "\n", + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import os\n", + "from tfx.components.evaluator.component import Evaluator\n", + "from tfx.components.example_gen.big_query_example_gen.component import BigQueryExampleGen\n", + "from tfx.components.example_validator.component import ExampleValidator\n", + "from tfx.components.model_validator.component import ModelValidator\n", + "from tfx.components.pusher.component import Pusher\n", + "from tfx.components.schema_gen.component import SchemaGen\n", + "from tfx.components.statistics_gen.component import StatisticsGen\n", + "from tfx.components.trainer.component import Trainer\n", + "from tfx.components.transform.component import Transform\n", + "from tfx.orchestration.kubeflow.runner import KubeflowDagRunner\n", + "from tfx.orchestration.pipeline import PipelineDecorator\n", + "from tfx.proto import evaluator_pb2\n", + "from tfx.proto import pusher_pb2\n", + "from tfx.proto import trainer_pb2\n", + "\n", + "# Python module file to inject customized logic into the TFX components. The\n", + "# Transform and Trainer both require user-defined functions to run successfully.\n", + "# Copy this from the current directory to a GCS bucket and update the location\n", + "# below.\n", + "_taxi_utils = os.path.join(_input_bucket, 'taxi_utils.py')\n", + "\n", + "# Path which can be listened to by the model server. Pusher will output the\n", + "# trained model here.\n", + "_serving_model_dir = os.path.join(_output_bucket, 'serving_model/taxi_bigquery')\n", + "\n", + "# Region to use for Dataflow jobs and CMLE training.\n", + "# Dataflow: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\n", + "# CMLE: https://cloud.google.com/ml-engine/docs/tensorflow/regions\n", + "_gcp_region = 'us-central1'\n", + "\n", + "# A dict which contains the training job parameters to be passed to Google\n", + "# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n", + "# Engine, refer to\n", + "# https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job\n", + "_cmle_training_args = {\n", + " 'pythonModule': None, # Will be populated by TFX\n", + " 'args': None, # Will be populated by TFX\n", + " 'region': _gcp_region,\n", + " 'jobDir': os.path.join(_output_bucket, 'tmp'),\n", + " 'runtimeVersion': '1.12',\n", + " 'pythonVersion': '2.7',\n", + " 'project': _project_id,\n", + "}\n", + "\n", + "# A dict which contains the serving job parameters to be passed to Google\n", + "# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n", + "# Engine, refer to\n", + "# https://cloud.google.com/ml-engine/reference/rest/v1/projects.models\n", + "_cmle_serving_args = {\n", + " 'model_name': 'chicago_taxi',\n", + " 'project_id': _project_id,\n", + " 'runtime_version': '1.12',\n", + "}\n", + "\n", + "# The rate at which to sample rows from the Chicago Taxi dataset using BigQuery.\n", + "# The full taxi dataset is > 120M record. In the interest of resource\n", + "# savings and time, we've set the default for this example to be much smaller.\n", + "# Feel free to crank it up and process the full dataset!\n", + "_query_sample_rate = 0.001 # Generate a 0.1% random sample.\n", + "\n", + "\n", + "# TODO(zhitaoli): Remove PipelineDecorator after 0.13.0.\n", + "@PipelineDecorator(\n", + " pipeline_name='chicago_taxi_pipeline_kubeflow',\n", + " log_root='/var/tmp/tfx/logs',\n", + " pipeline_root=_pipeline_root,\n", + " additional_pipeline_args={\n", + " 'beam_pipeline_args': [\n", + " '--runner=DataflowRunner',\n", + " '--experiments=shuffle_mode=auto',\n", + " '--project=' + _project_id,\n", + " '--temp_location=' + os.path.join(_output_bucket, 'tmp'),\n", + " '--region=' + _gcp_region,\n", + " ],\n", + " # Optional args:\n", + " # 'tfx_image': custom docker image to use for components. This is needed\n", + " # if TFX package is not installed from an RC or released version.\n", + " })\n", + "def _create_pipeline():\n", + " \"\"\"Implements the chicago taxi pipeline with TFX.\"\"\"\n", + "\n", + " query = \"\"\"\n", + " SELECT\n", + " pickup_community_area,\n", + " fare,\n", + " EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,\n", + " EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,\n", + " EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,\n", + " UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,\n", + " pickup_latitude,\n", + " pickup_longitude,\n", + " dropoff_latitude,\n", + " dropoff_longitude,\n", + " trip_miles,\n", + " pickup_census_tract,\n", + " dropoff_census_tract,\n", + " payment_type,\n", + " company,\n", + " trip_seconds,\n", + " dropoff_community_area,\n", + " tips\n", + " FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`\n", + " WHERE RAND() < {}\"\"\".format(_query_sample_rate)\n", + "\n", + " # Brings data into the pipeline or otherwise joins/converts training data.\n", + " example_gen = BigQueryExampleGen(query=query)\n", + "\n", + " # Computes statistics over data for visualization and example validation.\n", + " statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)\n", + "\n", + " # Generates schema based on statistics files.\n", + " infer_schema = SchemaGen(stats=statistics_gen.outputs.output)\n", + "\n", + " # Performs anomaly detection based on statistics and data schema.\n", + " validate_stats = ExampleValidator(\n", + " stats=statistics_gen.outputs.output, schema=infer_schema.outputs.output)\n", + "\n", + " # Performs transformations and feature engineering in training and serving.\n", + " transform = Transform(\n", + " input_data=example_gen.outputs.examples,\n", + " schema=infer_schema.outputs.output,\n", + " module_file=_taxi_utils)\n", + "\n", + " # Uses user-provided Python function that implements a model using TF-Learn.\n", + " trainer = Trainer(\n", + " module_file=_taxi_utils,\n", + " transformed_examples=transform.outputs.transformed_examples,\n", + " schema=infer_schema.outputs.output,\n", + " transform_output=transform.outputs.transform_output,\n", + " train_args=trainer_pb2.TrainArgs(num_steps=10000),\n", + " eval_args=trainer_pb2.EvalArgs(num_steps=5000),\n", + " custom_config={'cmle_training_args': _cmle_training_args})\n", + "\n", + " # Uses TFMA to compute a evaluation statistics over features of a model.\n", + " model_analyzer = Evaluator(\n", + " examples=example_gen.outputs.examples,\n", + " model_exports=trainer.outputs.output,\n", + " feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n", + " evaluator_pb2.SingleSlicingSpec(\n", + " column_for_slicing=['trip_start_hour'])\n", + " ]))\n", + "\n", + " # Performs quality validation of a candidate model (compared to a baseline).\n", + " model_validator = ModelValidator(\n", + " examples=example_gen.outputs.examples, model=trainer.outputs.output)\n", + "\n", + " # Checks whether the model passed the validation steps and pushes the model\n", + " # to a file destination if check passed.\n", + " pusher = Pusher(\n", + " model_export=trainer.outputs.output,\n", + " model_blessing=model_validator.outputs.blessing,\n", + " custom_config={'cmle_serving_args': _cmle_serving_args},\n", + " push_destination=pusher_pb2.PushDestination(\n", + " filesystem=pusher_pb2.PushDestination.Filesystem(\n", + " base_directory=_serving_model_dir)))\n", + "\n", + " return [\n", + " example_gen, statistics_gen, infer_schema, validate_stats, transform,\n", + " trainer, model_analyzer, model_validator, pusher\n", + " ]\n", + "\n", + "\n", + "pipeline = KubeflowDagRunner().run(_create_pipeline())" ] }, { @@ -112,16 +274,69 @@ { "cell_type": "code", "execution_count": null, - "metadata": { - "scrolled": true - }, + "metadata": {}, "outputs": [], "source": [ "import kfp\n", + "run_result = kfp.Client().create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Connect to the ML Metadata Store" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip3 install ml_metadata" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from ml_metadata.metadata_store import metadata_store\n", + "from ml_metadata.proto import metadata_store_pb2\n", + "import os\n", + "\n", + "connection_config = metadata_store_pb2.ConnectionConfig()\n", + "connection_config.mysql.host = os.getenv('MYSQL_SERVICE_HOST')\n", + "connection_config.mysql.port = int(os.getenv('MYSQL_SERVICE_PORT'))\n", + "connection_config.mysql.database = 'mlmetadata'\n", + "connection_config.mysql.user = 'root'\n", + "store = metadata_store.MetadataStore(connection_config)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get all output artifacts\n", + "store.get_artifacts()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get a specific artifact type\n", + "\n", + "# TFX types \n", + "# types = ['ModelExportPath', 'ExamplesPath', 'ModelBlessingPath', 'ModelPushPath', 'TransformPath', 'SchemaPath']\n", "\n", - "run_result = kfp.Client(\n", - " host=None # replace with Kubeflow Pipelines endpoint if it this is run outside of the Kubeflow cluster.\n", - ").create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" + "store.get_artifacts_by_type('ExamplesPath')" ] } ], @@ -141,15 +356,15 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.5.6" + "version": "3.6.7" }, "pycharm": { "stem_cell": { "cell_type": "raw", + "source": [], "metadata": { "collapsed": false - }, - "source": [] + } } } }, From 60005feb21519b2b279ff5449906adb79d6a5c06 Mon Sep 17 00:00:00 2001 From: Makoto Uchida Date: Fri, 15 Nov 2019 18:25:37 -0800 Subject: [PATCH 4/5] Modernize samples/core/tfx-oss demonstration --- samples/core/tfx-oss/TFX Example.ipynb | 263 +++---------------------- 1 file changed, 24 insertions(+), 239 deletions(-) diff --git a/samples/core/tfx-oss/TFX Example.ipynb b/samples/core/tfx-oss/TFX Example.ipynb index 1a377aa704a..67e487363d9 100644 --- a/samples/core/tfx-oss/TFX Example.ipynb +++ b/samples/core/tfx-oss/TFX Example.ipynb @@ -14,11 +14,13 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ - "!pip3 install 'tfx==0.14.0' --upgrade\n", - "!python3 -m pip install 'kfp>=0.1.31' --quiet\n" + "!pip3 install 'tfx==0.15.0' --upgrade\n", + "!python3 -m pip install 'kfp>=0.1.35' --quiet" ] }, { @@ -40,6 +42,7 @@ "cell_type": "code", "execution_count": null, "metadata": { + "scrolled": true, "tags": [ "parameters" ] @@ -59,11 +62,13 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "# copy the trainer code to a storage bucket as the TFX pipeline will need that code file in GCS\n", - "from tensorflow import gfile\n", + "from tensorflow.compat.v1 import gfile\n", "gfile.Copy('utils/taxi_utils.py', _input_bucket + '/taxi_utils.py')" ] }, @@ -75,7 +80,7 @@ "\n", "Reload this cell by running the load command to get the pipeline configuration file\n", "```\n", - "%load tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow.py\n", + "%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py\n", "```\n", "\n", "Configure:\n", @@ -89,179 +94,12 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ - "\"\"\"Chicago Taxi example using TFX DSL on Kubeflow.\"\"\"\n", - "\n", - "from __future__ import absolute_import\n", - "from __future__ import division\n", - "from __future__ import print_function\n", - "\n", - "import os\n", - "from tfx.components.evaluator.component import Evaluator\n", - "from tfx.components.example_gen.big_query_example_gen.component import BigQueryExampleGen\n", - "from tfx.components.example_validator.component import ExampleValidator\n", - "from tfx.components.model_validator.component import ModelValidator\n", - "from tfx.components.pusher.component import Pusher\n", - "from tfx.components.schema_gen.component import SchemaGen\n", - "from tfx.components.statistics_gen.component import StatisticsGen\n", - "from tfx.components.trainer.component import Trainer\n", - "from tfx.components.transform.component import Transform\n", - "from tfx.orchestration.kubeflow.runner import KubeflowDagRunner\n", - "from tfx.orchestration.pipeline import PipelineDecorator\n", - "from tfx.proto import evaluator_pb2\n", - "from tfx.proto import pusher_pb2\n", - "from tfx.proto import trainer_pb2\n", - "\n", - "# Python module file to inject customized logic into the TFX components. The\n", - "# Transform and Trainer both require user-defined functions to run successfully.\n", - "# Copy this from the current directory to a GCS bucket and update the location\n", - "# below.\n", - "_taxi_utils = os.path.join(_input_bucket, 'taxi_utils.py')\n", - "\n", - "# Path which can be listened to by the model server. Pusher will output the\n", - "# trained model here.\n", - "_serving_model_dir = os.path.join(_output_bucket, 'serving_model/taxi_bigquery')\n", - "\n", - "# Region to use for Dataflow jobs and CMLE training.\n", - "# Dataflow: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\n", - "# CMLE: https://cloud.google.com/ml-engine/docs/tensorflow/regions\n", - "_gcp_region = 'us-central1'\n", - "\n", - "# A dict which contains the training job parameters to be passed to Google\n", - "# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n", - "# Engine, refer to\n", - "# https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job\n", - "_cmle_training_args = {\n", - " 'pythonModule': None, # Will be populated by TFX\n", - " 'args': None, # Will be populated by TFX\n", - " 'region': _gcp_region,\n", - " 'jobDir': os.path.join(_output_bucket, 'tmp'),\n", - " 'runtimeVersion': '1.12',\n", - " 'pythonVersion': '2.7',\n", - " 'project': _project_id,\n", - "}\n", - "\n", - "# A dict which contains the serving job parameters to be passed to Google\n", - "# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n", - "# Engine, refer to\n", - "# https://cloud.google.com/ml-engine/reference/rest/v1/projects.models\n", - "_cmle_serving_args = {\n", - " 'model_name': 'chicago_taxi',\n", - " 'project_id': _project_id,\n", - " 'runtime_version': '1.12',\n", - "}\n", - "\n", - "# The rate at which to sample rows from the Chicago Taxi dataset using BigQuery.\n", - "# The full taxi dataset is > 120M record. In the interest of resource\n", - "# savings and time, we've set the default for this example to be much smaller.\n", - "# Feel free to crank it up and process the full dataset!\n", - "_query_sample_rate = 0.001 # Generate a 0.1% random sample.\n", - "\n", - "\n", - "# TODO(zhitaoli): Remove PipelineDecorator after 0.13.0.\n", - "@PipelineDecorator(\n", - " pipeline_name='chicago_taxi_pipeline_kubeflow',\n", - " log_root='/var/tmp/tfx/logs',\n", - " pipeline_root=_pipeline_root,\n", - " additional_pipeline_args={\n", - " 'beam_pipeline_args': [\n", - " '--runner=DataflowRunner',\n", - " '--experiments=shuffle_mode=auto',\n", - " '--project=' + _project_id,\n", - " '--temp_location=' + os.path.join(_output_bucket, 'tmp'),\n", - " '--region=' + _gcp_region,\n", - " ],\n", - " # Optional args:\n", - " # 'tfx_image': custom docker image to use for components. This is needed\n", - " # if TFX package is not installed from an RC or released version.\n", - " })\n", - "def _create_pipeline():\n", - " \"\"\"Implements the chicago taxi pipeline with TFX.\"\"\"\n", - "\n", - " query = \"\"\"\n", - " SELECT\n", - " pickup_community_area,\n", - " fare,\n", - " EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,\n", - " EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,\n", - " EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,\n", - " UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,\n", - " pickup_latitude,\n", - " pickup_longitude,\n", - " dropoff_latitude,\n", - " dropoff_longitude,\n", - " trip_miles,\n", - " pickup_census_tract,\n", - " dropoff_census_tract,\n", - " payment_type,\n", - " company,\n", - " trip_seconds,\n", - " dropoff_community_area,\n", - " tips\n", - " FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`\n", - " WHERE RAND() < {}\"\"\".format(_query_sample_rate)\n", - "\n", - " # Brings data into the pipeline or otherwise joins/converts training data.\n", - " example_gen = BigQueryExampleGen(query=query)\n", - "\n", - " # Computes statistics over data for visualization and example validation.\n", - " statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)\n", - "\n", - " # Generates schema based on statistics files.\n", - " infer_schema = SchemaGen(stats=statistics_gen.outputs.output)\n", - "\n", - " # Performs anomaly detection based on statistics and data schema.\n", - " validate_stats = ExampleValidator(\n", - " stats=statistics_gen.outputs.output, schema=infer_schema.outputs.output)\n", - "\n", - " # Performs transformations and feature engineering in training and serving.\n", - " transform = Transform(\n", - " input_data=example_gen.outputs.examples,\n", - " schema=infer_schema.outputs.output,\n", - " module_file=_taxi_utils)\n", - "\n", - " # Uses user-provided Python function that implements a model using TF-Learn.\n", - " trainer = Trainer(\n", - " module_file=_taxi_utils,\n", - " transformed_examples=transform.outputs.transformed_examples,\n", - " schema=infer_schema.outputs.output,\n", - " transform_output=transform.outputs.transform_output,\n", - " train_args=trainer_pb2.TrainArgs(num_steps=10000),\n", - " eval_args=trainer_pb2.EvalArgs(num_steps=5000),\n", - " custom_config={'cmle_training_args': _cmle_training_args})\n", - "\n", - " # Uses TFMA to compute a evaluation statistics over features of a model.\n", - " model_analyzer = Evaluator(\n", - " examples=example_gen.outputs.examples,\n", - " model_exports=trainer.outputs.output,\n", - " feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n", - " evaluator_pb2.SingleSlicingSpec(\n", - " column_for_slicing=['trip_start_hour'])\n", - " ]))\n", - "\n", - " # Performs quality validation of a candidate model (compared to a baseline).\n", - " model_validator = ModelValidator(\n", - " examples=example_gen.outputs.examples, model=trainer.outputs.output)\n", - "\n", - " # Checks whether the model passed the validation steps and pushes the model\n", - " # to a file destination if check passed.\n", - " pusher = Pusher(\n", - " model_export=trainer.outputs.output,\n", - " model_blessing=model_validator.outputs.blessing,\n", - " custom_config={'cmle_serving_args': _cmle_serving_args},\n", - " push_destination=pusher_pb2.PushDestination(\n", - " filesystem=pusher_pb2.PushDestination.Filesystem(\n", - " base_directory=_serving_model_dir)))\n", - "\n", - " return [\n", - " example_gen, statistics_gen, infer_schema, validate_stats, transform,\n", - " trainer, model_analyzer, model_validator, pusher\n", - " ]\n", - "\n", - "\n", - "pipeline = KubeflowDagRunner().run(_create_pipeline())" + "%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py" ] }, { @@ -274,69 +112,16 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "import kfp\n", - "run_result = kfp.Client().create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Connect to the ML Metadata Store" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!pip3 install ml_metadata" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from ml_metadata.metadata_store import metadata_store\n", - "from ml_metadata.proto import metadata_store_pb2\n", - "import os\n", - "\n", - "connection_config = metadata_store_pb2.ConnectionConfig()\n", - "connection_config.mysql.host = os.getenv('MYSQL_SERVICE_HOST')\n", - "connection_config.mysql.port = int(os.getenv('MYSQL_SERVICE_PORT'))\n", - "connection_config.mysql.database = 'mlmetadata'\n", - "connection_config.mysql.user = 'root'\n", - "store = metadata_store.MetadataStore(connection_config)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get all output artifacts\n", - "store.get_artifacts()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get a specific artifact type\n", - "\n", - "# TFX types \n", - "# types = ['ModelExportPath', 'ExamplesPath', 'ModelBlessingPath', 'ModelPushPath', 'TransformPath', 'SchemaPath']\n", "\n", - "store.get_artifacts_by_type('ExamplesPath')" + "run_result = kfp.Client(\n", + " host=None # replace with Kubeflow Pipelines endpoint if it this is run outside of the Kubeflow cluster.\n", + ").create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" ] } ], @@ -356,15 +141,15 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.5.6" }, "pycharm": { "stem_cell": { "cell_type": "raw", - "source": [], "metadata": { "collapsed": false - } + }, + "source": [] } } }, From 9a4424b52af0c7db21dded44251c3d98d81bb57c Mon Sep 17 00:00:00 2001 From: Makoto Uchida Date: Fri, 15 Nov 2019 18:26:47 -0800 Subject: [PATCH 5/5] Fix comment style --- samples/core/tfx-oss/TFX Example.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/core/tfx-oss/TFX Example.ipynb b/samples/core/tfx-oss/TFX Example.ipynb index 67e487363d9..aa3a40f4a5f 100644 --- a/samples/core/tfx-oss/TFX Example.ipynb +++ b/samples/core/tfx-oss/TFX Example.ipynb @@ -120,7 +120,7 @@ "import kfp\n", "\n", "run_result = kfp.Client(\n", - " host=None # replace with Kubeflow Pipelines endpoint if it this is run outside of the Kubeflow cluster.\n", + " host=None # replace with Kubeflow Pipelines endpoint if this notebook is run outside of the Kubeflow cluster.\n", ").create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})" ] }