Skip to content

Commit

Permalink
TFX Iris sample (kubeflow#3119)
Browse files Browse the repository at this point in the history
* init

* update comment

* fix module file

* clean up

* update to beam sample

* add doc of default bucket

* bump viz server tfma version

* update iris sample to keras native version

* update iris sample to keras native version

* pin TFMA

* add readme

* add to sample test corpus

* add prebuilt && update some config

* sync frontend

* update snapshot

* update snapshot

* fix gettingstarted page

* fix unit test

* fix unit test

* update description

* update some comments

* add some dependencies.
  • Loading branch information
Jiaxiao Zheng authored and Jeffwan committed Dec 9, 2020
1 parent e28b239 commit 87d91bd
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 11 deletions.
2 changes: 1 addition & 1 deletion backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ tensorboard==2.1.0 # via tensorflow
tensorflow-data-validation==0.21.4 # via tfx
tensorflow-estimator==2.1.0 # via tensorflow
tensorflow-metadata==0.21.1 # via tensorflow-data-validation, tensorflow-model-analysis, tensorflow-transform, tfx-bsl
tensorflow-model-analysis==0.21.4 # via tfx
tensorflow-model-analysis==0.21.5 # via -r requirements.in (line 2), tfx
tensorflow-serving-api==2.1.0 # via tfx, tfx-bsl
tensorflow-transform==0.21.2 # via tensorflow-data-validation, tfx
tensorflow==2.1.0 # via ml-metadata, tensorflow-data-validation, tensorflow-model-analysis, tensorflow-serving-api, tensorflow-transform, tfx, tfx-bsl
Expand Down
9 changes: 7 additions & 2 deletions backend/src/apiserver/config/sample_config.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[
{
"name": "[Demo] XGBoost - Training with Confusion Matrix",
"name": "[Demo] XGBoost - Training with confusion matrix",
"description": "[source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm) [GCP Permission requirements](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm#requirements). A trainer that does end-to-end distributed training for XGBoost models.",
"file": "/samples/core/xgboost_training_cm/xgboost_training_cm.py.yaml"
},
{
"name": "[Demo] TFX - Taxi Tip Prediction Model Trainer",
"name": "[Demo] TFX - Taxi tip prediction model trainer",
"description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/core/parameterized_tfx_oss) [GCP Permission requirements](https://github.com/kubeflow/pipelines/blob/master/samples/core/parameterized_tfx_oss#permission). Example pipeline that does classification with model analysis based on a public tax cab dataset.",
"file": "/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py.yaml"
},
Expand All @@ -18,5 +18,10 @@
"name": "[Tutorial] DSL - Control structures",
"description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/DSL%20-%20Control%20structures) Shows how to use conditional execution and exit handlers. This pipeline will randomly fail to demonstrate that the exit handler gets executed even in case of failure.",
"file": "/samples/tutorials/DSL - Control structures/DSL - Control structures.py.yaml"
},
{
"name": "[Demo] TFX - Iris classification pipeline",
"description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/core/iris). Example pipeline that classifies Iris flower subspecies and how to use native Keras within TFX.",
"file": "/samples/core/iris/iris.py.yaml"
}
]
7 changes: 4 additions & 3 deletions frontend/src/config/sample_config_from_backend.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[
"[Demo] XGBoost - Training with Confusion Matrix",
"[Demo] TFX - Taxi Tip Prediction Model Trainer",
"[Demo] XGBoost - Training with confusion matrix",
"[Demo] TFX - Taxi tip prediction model trainer",
"[Tutorial] Data passing in python components",
"[Tutorial] DSL - Control structures"
"[Tutorial] DSL - Control structures",
"[Demo] TFX - Iris classification pipeline"
]
4 changes: 2 additions & 2 deletions frontend/src/pages/__snapshots__/GettingStarted.test.tsx.snap
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ Array [
undefined,
10,
undefined,
"%7B%22predicates%22%3A%5B%7B%22key%22%3A%22name%22%2C%22op%22%3A%22EQUALS%22%2C%22string_value%22%3A%22%5BDemo%5D%20XGBoost%20-%20Training%20with%20Confusion%20Matrix%22%7D%5D%7D",
"%7B%22predicates%22%3A%5B%7B%22key%22%3A%22name%22%2C%22op%22%3A%22EQUALS%22%2C%22string_value%22%3A%22%5BDemo%5D%20XGBoost%20-%20Training%20with%20confusion%20matrix%22%7D%5D%7D",
],
Array [
undefined,
10,
undefined,
"%7B%22predicates%22%3A%5B%7B%22key%22%3A%22name%22%2C%22op%22%3A%22EQUALS%22%2C%22string_value%22%3A%22%5BDemo%5D%20TFX%20-%20Taxi%20Tip%20Prediction%20Model%20Trainer%22%7D%5D%7D",
"%7B%22predicates%22%3A%5B%7B%22key%22%3A%22name%22%2C%22op%22%3A%22EQUALS%22%2C%22string_value%22%3A%22%5BDemo%5D%20TFX%20-%20Taxi%20tip%20prediction%20model%20trainer%22%7D%5D%7D",
],
Array [
undefined,
Expand Down
31 changes: 31 additions & 0 deletions samples/core/iris/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Overview
[Tensorflow Extended (TFX)](https://github.com/tensorflow/tfx) is a Google-production-scale machine
learning platform based on TensorFlow. It provides a configuration framework to express ML pipelines
consisting of TFX components. Kubeflow Pipelines can be used as the orchestrator supporting the
execution of a TFX pipeline.

This directory contains a sample that demonstrate how to author a ML pipeline
to solve the famous [iris flower classification problem](https://www.kaggle.com/arshid/iris-flower-dataset)
in TFX and run it on a KFP deployment. Specifically it highlights the following
functionalities:

1. Support of [Keras](https://keras.io/) API;
2. Use [TFMA](https://github.com/tensorflow/model-analysis) for model validation;
3. Warm-start training by Resolver.

# Compilation
In order to successfully compile the Python sample, it is recommended to use
`tfx>=0.21.2`.

# Permission

> :warning: If you are using **full-scope** or **workload identity enabled** cluster in hosted pipeline beta version, **DO NOT** follow this section. However you'll still need to enable corresponding GCP API.
This pipeline requires Google Cloud Storage permission to run.
If KFP was deployed through K8S marketplace, please follow instructions in
[the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials)
to make sure the service account has `storage.admin` role.
If KFP was deployed through
[standalone deployment](https://github.com/kubeflow/pipelines/tree/master/manifests/kustomize)
please refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/)
to provide `storage.admin` permission.
210 changes: 210 additions & 0 deletions samples/core/iris/iris.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#!/usr/bin/env python3
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Iris flowers example using TFX. Based on https://github.com/tensorflow/tfx/blob/master/tfx/examples/iris/iris_pipeline_native_keras.py"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import kfp
from typing import Text

import absl
import tensorflow_model_analysis as tfma

from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import data_types
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import trainer_pb2
from tfx.proto import pusher_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input

_pipeline_name = 'iris_native_keras'

# This example assumes that Iris flowers data is stored in GCS and the
# utility function is in iris_utils.py. Feel free to customize as needed.
_data_root_param = data_types.RuntimeParameter(
name='data-root',
default='gs://ml-pipeline-playground/iris/data',
ptype=Text,
)

# Python module file to inject customized logic into the TFX components. The
# Transform and Trainer both require user-defined functions to run successfully.
# This file is fork from https://github.com/tensorflow/tfx/blob/master/tfx/examples/iris/iris_utils_native_keras.py
_module_file_param = data_types.RuntimeParameter(
name='module-file',
default=
'gs://ml-pipeline-playground/iris/modules/iris_utils_native_keras.py',
ptype=Text,
)

# Directory and data locations. This example assumes all of the flowers
# example code and metadata library is relative to a GCS path.
# Note: if one deployed KFP from GKE marketplace, it's possible to leverage
# the following magic placeholder to auto-populate the default GCS bucket
# associated with KFP deployment. Otherwise you'll need to replace it with your
# actual bucket name here or when creating a run.
_pipeline_root = os.path.join(
'gs://{{kfp-default-bucket}}', 'tfx_iris', kfp.dsl.RUN_ID_PLACEHOLDER
)


def _create_pipeline(
pipeline_name: Text, pipeline_root: Text
) -> pipeline.Pipeline:
"""Implements the Iris flowers pipeline with TFX."""
examples = external_input(_data_root_param)

# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = CsvExampleGen(input=examples)

# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

# Generates schema based on statistics files.
infer_schema = SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True
)

# Performs anomaly detection based on statistics and data schema.
validate_stats = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema']
)

# Performs transformations and feature engineering in training and serving.
transform = Transform(
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=_module_file_param
)

# Uses user-provided Python function that implements a model using Keras.
trainer = Trainer(
module_file=_module_file_param,
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=infer_schema.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=100),
eval_args=trainer_pb2.EvalArgs(num_steps=50)
)

# Get the latest blessed model for model validation.
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing)
)

# Uses TFMA to compute an evaluation statistics over features of a model and
# perform quality validation of a candidate model (compared to a baseline).
# Note: to compile this successfully you'll need TFMA at >= 0.21.5
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(name='candidate', label_key='variety'),
tfma.ModelSpec(
name='baseline', label_key='variety', is_baseline=True
)
],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(
metrics=[
tfma.MetricConfig(
class_name='SparseCategoricalAccuracy',
threshold=tfma.config.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.9}
),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10}
)
)
)
]
)
]
)

# Uses TFMA to compute a evaluation statistics over features of a model.
model_analyzer = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config
)

# Checks whether the model passed the validation steps and pushes the model
# to a file destination if check passed.
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=model_analyzer.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=os.path.
join(str(pipeline.ROOT_PARAMETER), 'model_serving')
)
)
)

return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen, statistics_gen, infer_schema, validate_stats, transform,
trainer, model_resolver, model_analyzer, pusher
],
enable_cache=True,
)


if __name__ == '__main__':
absl.logging.set_verbosity(absl.logging.INFO)
# Make sure the version of TFX image used is consistent with the version of
# TFX SDK. Here we use tfx:0.21.2 image.
config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=kubeflow_dag_runner.
get_default_kubeflow_metadata_config(),
tfx_image='tensorflow/tfx:0.21.2',
)
kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(
output_filename=__file__ + '.yaml', config=config
)
kfp_runner.run(
_create_pipeline(
pipeline_name=_pipeline_name, pipeline_root=_pipeline_root
)
)
2 changes: 1 addition & 1 deletion test/sample-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM google/cloud-sdk:279.0.0

RUN apt-get update -y
RUN apt-get install --no-install-recommends -y -q libssl-dev libffi-dev wget ssh
RUN apt-get install --no-install-recommends -y -q default-jre default-jdk python3-setuptools python3.7-dev gcc
RUN apt-get install --no-install-recommends -y -q default-jre default-jdk python3-setuptools python3.7-dev gcc libpython3.7-dev zlib1g-dev

RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py

Expand Down
18 changes: 18 additions & 0 deletions test/sample-test/configs/iris.config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test_name: iris
arguments:
output:
run_pipeline: True
4 changes: 2 additions & 2 deletions test/sample-test/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ google-pasta==0.1.8 # via tensorflow
google-resumable-media==0.4.1 # via google-cloud-bigquery, google-cloud-storage
googleapis-common-protos[grpc]==1.51.0 # via google-api-core, grpc-google-iam-v1, tensorflow-metadata
grpc-google-iam-v1==0.12.3 # via google-cloud-bigtable, google-cloud-pubsub
grpcio==1.27.1 # via apache-beam, google-api-core, googleapis-common-protos, grpc-google-iam-v1, tensorboard, tensorflow, tensorflow-serving-api, tfx
grpcio==1.27.1 # via apache-beam, google-api-core, grpc-google-iam-v1, tensorboard, tensorflow, tensorflow-serving-api, tfx
h5py==2.10.0 # via keras-applications
hdfs==2.5.8 # via apache-beam
httplib2==0.12.0 # via apache-beam, google-api-python-client, google-apitools, google-auth-httplib2, oauth2client
Expand Down Expand Up @@ -117,7 +117,7 @@ tensorboard==2.1.0 # via tensorflow
tensorflow-data-validation==0.21.4 # via tfx
tensorflow-estimator==2.1.0 # via tensorflow
tensorflow-metadata==0.21.1 # via tensorflow-data-validation, tensorflow-model-analysis, tensorflow-transform, tfx-bsl
tensorflow-model-analysis==0.21.4 # via tfx
tensorflow-model-analysis==0.21.5 # via -r requirements.in (line 13), tfx
tensorflow-serving-api==2.1.0 # via tfx, tfx-bsl
tensorflow-transform==0.21.2 # via tensorflow-data-validation, tfx
tensorflow==2.1.0 # via ml-metadata, tensorflow-data-validation, tensorflow-model-analysis, tensorflow-serving-api, tensorflow-transform, tfx, tfx-bsl
Expand Down
5 changes: 5 additions & 0 deletions test/sample-test/run_sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def run(self):
self._test_args['output'],
'tfx_taxi_simple_' + kfp.dsl.RUN_ID_PLACEHOLDER)
del self._test_args['output']
if self._testname == 'iris':
self._test_args['pipeline-root'] = os.path.join(
self._test_args['output'],
'tfx_iris_' + kfp.dsl.RUN_ID_PLACEHOLDER)
del self._test_args['output']

# Submit for pipeline running.
if self._run_pipeline:
Expand Down
1 change: 1 addition & 0 deletions test/sample_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ spec:
- multiple_outputs
- ai_platform
- parameterized_tfx_oss
- iris
# Build and push image
- name: build-image-by-dockerfile
retryStrategy:
Expand Down

0 comments on commit 87d91bd

Please sign in to comment.