Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize samples/core/tfx-oss Notebook #2618

Merged
merged 6 commits into from
Nov 19, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 24 additions & 239 deletions samples/core/tfx-oss/TFX Example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!pip3 install 'tfx==0.14.0' --upgrade\n",
"!python3 -m pip install 'kfp>=0.1.31' --quiet\n"
"!pip3 install 'tfx==0.15.0' --upgrade\n",
"!python3 -m pip install 'kfp>=0.1.35' --quiet"
]
},
{
Expand All @@ -40,6 +42,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true,
"tags": [
"parameters"
]
Expand All @@ -59,11 +62,13 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# copy the trainer code to a storage bucket as the TFX pipeline will need that code file in GCS\n",
"from tensorflow import gfile\n",
"from tensorflow.compat.v1 import gfile\n",
"gfile.Copy('utils/taxi_utils.py', _input_bucket + '/taxi_utils.py')"
]
},
Expand All @@ -75,7 +80,7 @@
"\n",
"Reload this cell by running the load command to get the pipeline configuration file\n",
"```\n",
"%load tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow.py\n",
"%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py\n",
"```\n",
"\n",
"Configure:\n",
Expand All @@ -89,179 +94,12 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"\"\"\"Chicago Taxi example using TFX DSL on Kubeflow.\"\"\"\n",
"\n",
"from __future__ import absolute_import\n",
"from __future__ import division\n",
"from __future__ import print_function\n",
"\n",
"import os\n",
"from tfx.components.evaluator.component import Evaluator\n",
"from tfx.components.example_gen.big_query_example_gen.component import BigQueryExampleGen\n",
"from tfx.components.example_validator.component import ExampleValidator\n",
"from tfx.components.model_validator.component import ModelValidator\n",
"from tfx.components.pusher.component import Pusher\n",
"from tfx.components.schema_gen.component import SchemaGen\n",
"from tfx.components.statistics_gen.component import StatisticsGen\n",
"from tfx.components.trainer.component import Trainer\n",
"from tfx.components.transform.component import Transform\n",
"from tfx.orchestration.kubeflow.runner import KubeflowDagRunner\n",
"from tfx.orchestration.pipeline import PipelineDecorator\n",
"from tfx.proto import evaluator_pb2\n",
"from tfx.proto import pusher_pb2\n",
"from tfx.proto import trainer_pb2\n",
"\n",
"# Python module file to inject customized logic into the TFX components. The\n",
"# Transform and Trainer both require user-defined functions to run successfully.\n",
"# Copy this from the current directory to a GCS bucket and update the location\n",
"# below.\n",
"_taxi_utils = os.path.join(_input_bucket, 'taxi_utils.py')\n",
"\n",
"# Path which can be listened to by the model server. Pusher will output the\n",
"# trained model here.\n",
"_serving_model_dir = os.path.join(_output_bucket, 'serving_model/taxi_bigquery')\n",
"\n",
"# Region to use for Dataflow jobs and CMLE training.\n",
"# Dataflow: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\n",
"# CMLE: https://cloud.google.com/ml-engine/docs/tensorflow/regions\n",
"_gcp_region = 'us-central1'\n",
"\n",
"# A dict which contains the training job parameters to be passed to Google\n",
"# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n",
"# Engine, refer to\n",
"# https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job\n",
"_cmle_training_args = {\n",
" 'pythonModule': None, # Will be populated by TFX\n",
" 'args': None, # Will be populated by TFX\n",
" 'region': _gcp_region,\n",
" 'jobDir': os.path.join(_output_bucket, 'tmp'),\n",
" 'runtimeVersion': '1.12',\n",
" 'pythonVersion': '2.7',\n",
" 'project': _project_id,\n",
"}\n",
"\n",
"# A dict which contains the serving job parameters to be passed to Google\n",
"# Cloud ML Engine. For the full set of parameters supported by Google Cloud ML\n",
"# Engine, refer to\n",
"# https://cloud.google.com/ml-engine/reference/rest/v1/projects.models\n",
"_cmle_serving_args = {\n",
" 'model_name': 'chicago_taxi',\n",
" 'project_id': _project_id,\n",
" 'runtime_version': '1.12',\n",
"}\n",
"\n",
"# The rate at which to sample rows from the Chicago Taxi dataset using BigQuery.\n",
"# The full taxi dataset is > 120M record. In the interest of resource\n",
"# savings and time, we've set the default for this example to be much smaller.\n",
"# Feel free to crank it up and process the full dataset!\n",
"_query_sample_rate = 0.001 # Generate a 0.1% random sample.\n",
"\n",
"\n",
"# TODO(zhitaoli): Remove PipelineDecorator after 0.13.0.\n",
"@PipelineDecorator(\n",
" pipeline_name='chicago_taxi_pipeline_kubeflow',\n",
" log_root='/var/tmp/tfx/logs',\n",
" pipeline_root=_pipeline_root,\n",
" additional_pipeline_args={\n",
" 'beam_pipeline_args': [\n",
" '--runner=DataflowRunner',\n",
" '--experiments=shuffle_mode=auto',\n",
" '--project=' + _project_id,\n",
" '--temp_location=' + os.path.join(_output_bucket, 'tmp'),\n",
" '--region=' + _gcp_region,\n",
" ],\n",
" # Optional args:\n",
" # 'tfx_image': custom docker image to use for components. This is needed\n",
" # if TFX package is not installed from an RC or released version.\n",
" })\n",
"def _create_pipeline():\n",
" \"\"\"Implements the chicago taxi pipeline with TFX.\"\"\"\n",
"\n",
" query = \"\"\"\n",
" SELECT\n",
" pickup_community_area,\n",
" fare,\n",
" EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,\n",
" EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,\n",
" EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,\n",
" UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,\n",
" pickup_latitude,\n",
" pickup_longitude,\n",
" dropoff_latitude,\n",
" dropoff_longitude,\n",
" trip_miles,\n",
" pickup_census_tract,\n",
" dropoff_census_tract,\n",
" payment_type,\n",
" company,\n",
" trip_seconds,\n",
" dropoff_community_area,\n",
" tips\n",
" FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`\n",
" WHERE RAND() < {}\"\"\".format(_query_sample_rate)\n",
"\n",
" # Brings data into the pipeline or otherwise joins/converts training data.\n",
" example_gen = BigQueryExampleGen(query=query)\n",
"\n",
" # Computes statistics over data for visualization and example validation.\n",
" statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)\n",
"\n",
" # Generates schema based on statistics files.\n",
" infer_schema = SchemaGen(stats=statistics_gen.outputs.output)\n",
"\n",
" # Performs anomaly detection based on statistics and data schema.\n",
" validate_stats = ExampleValidator(\n",
" stats=statistics_gen.outputs.output, schema=infer_schema.outputs.output)\n",
"\n",
" # Performs transformations and feature engineering in training and serving.\n",
" transform = Transform(\n",
" input_data=example_gen.outputs.examples,\n",
" schema=infer_schema.outputs.output,\n",
" module_file=_taxi_utils)\n",
"\n",
" # Uses user-provided Python function that implements a model using TF-Learn.\n",
" trainer = Trainer(\n",
" module_file=_taxi_utils,\n",
" transformed_examples=transform.outputs.transformed_examples,\n",
" schema=infer_schema.outputs.output,\n",
" transform_output=transform.outputs.transform_output,\n",
" train_args=trainer_pb2.TrainArgs(num_steps=10000),\n",
" eval_args=trainer_pb2.EvalArgs(num_steps=5000),\n",
" custom_config={'cmle_training_args': _cmle_training_args})\n",
"\n",
" # Uses TFMA to compute a evaluation statistics over features of a model.\n",
" model_analyzer = Evaluator(\n",
" examples=example_gen.outputs.examples,\n",
" model_exports=trainer.outputs.output,\n",
" feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n",
" evaluator_pb2.SingleSlicingSpec(\n",
" column_for_slicing=['trip_start_hour'])\n",
" ]))\n",
"\n",
" # Performs quality validation of a candidate model (compared to a baseline).\n",
" model_validator = ModelValidator(\n",
" examples=example_gen.outputs.examples, model=trainer.outputs.output)\n",
"\n",
" # Checks whether the model passed the validation steps and pushes the model\n",
" # to a file destination if check passed.\n",
" pusher = Pusher(\n",
" model_export=trainer.outputs.output,\n",
" model_blessing=model_validator.outputs.blessing,\n",
" custom_config={'cmle_serving_args': _cmle_serving_args},\n",
" push_destination=pusher_pb2.PushDestination(\n",
" filesystem=pusher_pb2.PushDestination.Filesystem(\n",
" base_directory=_serving_model_dir)))\n",
"\n",
" return [\n",
" example_gen, statistics_gen, infer_schema, validate_stats, transform,\n",
" trainer, model_analyzer, model_validator, pusher\n",
" ]\n",
"\n",
"\n",
"pipeline = KubeflowDagRunner().run(_create_pipeline())"
"%load https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_kubeflow_gcp.py"
]
},
{
Expand All @@ -274,69 +112,16 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"import kfp\n",
"run_result = kfp.Client().create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connect to the ML Metadata Store"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip3 install ml_metadata"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from ml_metadata.metadata_store import metadata_store\n",
"from ml_metadata.proto import metadata_store_pb2\n",
"import os\n",
"\n",
"connection_config = metadata_store_pb2.ConnectionConfig()\n",
"connection_config.mysql.host = os.getenv('MYSQL_SERVICE_HOST')\n",
"connection_config.mysql.port = int(os.getenv('MYSQL_SERVICE_PORT'))\n",
"connection_config.mysql.database = 'mlmetadata'\n",
"connection_config.mysql.user = 'root'\n",
"store = metadata_store.MetadataStore(connection_config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get all output artifacts\n",
"store.get_artifacts()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get a specific artifact type\n",
"\n",
"# TFX types \n",
"# types = ['ModelExportPath', 'ExamplesPath', 'ModelBlessingPath', 'ModelPushPath', 'TransformPath', 'SchemaPath']\n",
"\n",
"store.get_artifacts_by_type('ExamplesPath')"
"run_result = kfp.Client(\n",
" host=None # replace with Kubeflow Pipelines endpoint if this notebook is run outside of the Kubeflow cluster.\n",
").create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow.tar.gz', arguments={})"
]
}
],
Expand All @@ -356,15 +141,15 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
"version": "3.5.6"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
},
"source": []
}
}
},
Expand Down