Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update samples/core/tfx-oss to tfx==0.14.0 and kfp=0.1.31 #2385

Merged
merged 4 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions samples/core/tfx-oss/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ then activate the environment.

Install TFX and Kubeflow Pipelines SDK
```
pip3 install tfx==0.13.0 --upgrade
pip3 install kfp --upgrade
pip3 install 'tfx==0.14.0' --upgrade
pip3 install 'kfp>=0.1.31' --upgrade
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be 'kfp>=0.1.31.2'?

Copy link
Contributor Author

@ucdmkt ucdmkt Oct 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compatibility issue in TFX's KubeflowDagRunne was only for test (which is fixed anyways tensorflow/tfx@ea85bfe#diff-642c05352d80030caf1d9d3fe41f9740), but there was no compatibility issue in orchestrator itself.

Is there any other reason to prefer 0.1.31.2 or higher?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.1.31.2 is just the current latest version.
Usually the latest version is the best, containing all the bug fixes.
The 0.1.32 will be released tomorrow which will have some more fixes.

```

Upload the utility code to your storage bucket. You can modify this code if needed for a different dataset.
Expand All @@ -37,11 +37,11 @@ gfile.Copy('utils/taxi_utils.py', 'gs://<my bucket>/<path>/taxi_utils.py')

## Configure the TFX Pipeline

Modify the pipeline configurations at
Modify the pipeline configurations at
```
TFX Example.ipynb
```
Configure
Configure
- Set `_input_bucket` to the GCS directory where you've copied taxi_utils.py. I.e. gs://<my bucket>/<path>/
- Set `_output_bucket` to the GCS directory where you've want the results to be written
- Set GCP project ID (replace my-gcp-project). Note that it should be project ID, not project name.
Expand Down
4 changes: 2 additions & 2 deletions samples/core/tfx-oss/TFX Example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"metadata": {},
"outputs": [],
"source": [
"!pip3 install tfx==0.13.0 --upgrade\n",
"!pip3 install 'tfx==0.14.0' --upgrade\n",
"!python3 -m pip install 'kfp>=0.1.31' --quiet\n"
]
},
Expand Down Expand Up @@ -370,4 +370,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}
64 changes: 27 additions & 37 deletions samples/core/tfx-oss/utils/taxi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,9 @@
from __future__ import division
from __future__ import print_function

import os

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.saved import saved_transform_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import schema_utils

# Categorical features are assumed to each have a maximum value in the dataset.
Expand Down Expand Up @@ -81,11 +76,11 @@ def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn():
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.TFRecordReader(
options=tf.python_io.TFRecordOptions(
compression_type=tf.python_io.TFRecordCompressionType.GZIP))
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')


def _fill_in_missing(x):
Expand Down Expand Up @@ -132,7 +127,8 @@ def preprocessing_fn(inputs):

for key in _BUCKET_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT)
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
always_return_num_quantiles=False)

for key in _CATEGORICAL_FEATURE_KEYS:
outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])
Expand All @@ -154,7 +150,7 @@ def _build_estimator(config, hidden_units=None, warm_start_from=None):
"""Build an estimator for predicting the tipping behavior of taxi riders.

Args:
config: tf.contrib.learn.RunConfig defining the runtime environment for the
config: tf.estimator.RunConfig defining the runtime environment for the
estimator (including model_dir).
hidden_units: [int], the layer sizes of the DNN (input layer first)
warm_start_from: Optional directory to warm start from.
Expand Down Expand Up @@ -196,12 +192,11 @@ def _build_estimator(config, hidden_units=None, warm_start_from=None):
warm_start_from=warm_start_from)


def _example_serving_receiver_fn(transform_output, schema):
def _example_serving_receiver_fn(tf_transform_output, schema):
"""Build the serving in inputs.

Args:
transform_output: directory in which the tf-transform model was written
during the preprocessing step.
tf_transform_output: A TFTransformOutput.
schema: the schema of the input data.

Returns:
Expand All @@ -214,21 +209,18 @@ def _example_serving_receiver_fn(transform_output, schema):
raw_feature_spec, default_batch_size=None)
serving_input_receiver = raw_input_fn()

_, transformed_features = (
saved_transform_io.partially_apply_saved_transform(
os.path.join(transform_output, transform_fn_io.TRANSFORM_FN_DIR),
serving_input_receiver.features))
transformed_features = tf_transform_output.transform_raw_features(
serving_input_receiver.features)

return tf.estimator.export.ServingInputReceiver(
transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(transform_output, schema):
def _eval_input_receiver_fn(tf_transform_output, schema):
"""Build everything needed for the tf-model-analysis to run the model.

Args:
transform_output: directory in which the tf-transform model was written
during the preprocessing step.
tf_transform_output: A TFTransformOutput.
schema: the schema of the input data.

Returns:
Expand All @@ -250,10 +242,8 @@ def _eval_input_receiver_fn(transform_output, schema):

# Now that we have our raw examples, process them through the tf-transform
# function computed during the preprocessing step.
_, transformed_features = (
saved_transform_io.partially_apply_saved_transform(
os.path.join(transform_output, transform_fn_io.TRANSFORM_FN_DIR),
features))
transformed_features = tf_transform_output.transform_raw_features(
features)

# The key name MUST be 'examples'.
receiver_tensors = {'examples': serialized_tf_example}
Expand All @@ -268,27 +258,25 @@ def _eval_input_receiver_fn(transform_output, schema):
labels=transformed_features[_transformed_name(_LABEL_KEY)])


def _input_fn(filenames, transform_output, batch_size=200):
def _input_fn(filenames, tf_transform_output, batch_size=200):
"""Generates features and labels for training or evaluation.

Args:
filenames: [str] list of CSV files to read data from.
transform_output: directory in which the tf-transform model was written
during the preprocessing step.
tf_transform_output: A TFTransformOutput.
batch_size: int First dimension size of the Tensors returned by input_fn

Returns:
A (features, indices) tuple where features is a dictionary of
Tensors, and indices is a single Tensor of label indices.
"""
metadata_dir = os.path.join(transform_output,
transform_fn_io.TRANSFORMED_METADATA_DIR)
transformed_metadata = metadata_io.read_metadata(metadata_dir)
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()
transformed_feature_spec = (
tf_transform_output.transformed_feature_spec().copy())

transformed_features = tf.contrib.learn.io.read_batch_features(
dataset = tf.data.experimental.make_batched_features_dataset(
filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

transformed_features = dataset.make_one_shot_iterator().get_next()
# We pop the label because we do not want to use it as a feature while we're
# training.
return transformed_features, transformed_features.pop(
Expand Down Expand Up @@ -318,22 +306,24 @@ def trainer_fn(hparams, schema):
train_batch_size = 40
eval_batch_size = 40

tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

train_input_fn = lambda: _input_fn( # pylint: disable=g-long-lambda
hparams.train_files,
hparams.transform_output,
tf_transform_output,
batch_size=train_batch_size)

eval_input_fn = lambda: _input_fn( # pylint: disable=g-long-lambda
hparams.eval_files,
hparams.transform_output,
tf_transform_output,
batch_size=eval_batch_size)

train_spec = tf.estimator.TrainSpec( # pylint: disable=g-long-lambda
train_input_fn,
max_steps=hparams.train_steps)

serving_receiver_fn = lambda: _example_serving_receiver_fn( # pylint: disable=g-long-lambda
hparams.transform_output, schema)
tf_transform_output, schema)

exporter = tf.estimator.FinalExporter('chicago-taxi', serving_receiver_fn)
eval_spec = tf.estimator.EvalSpec(
Expand All @@ -358,7 +348,7 @@ def trainer_fn(hparams, schema):

# Create an input receiver for TFMA processing
receiver_fn = lambda: _eval_input_receiver_fn( # pylint: disable=g-long-lambda
hparams.transform_output, schema)
tf_transform_output, schema)

return {
'estimator': estimator,
Expand Down