Skip to content

Commit

Permalink
Improve TFX Taxi Sample and Components. (#518)
Browse files Browse the repository at this point in the history
* Improve TFX Taxi Sample and Components.

- Confusion matrix and ROC components support flexible target.
- Use single mode instead of 4 different modes for TFDV, TFMA, TFT, Predicton.
- TFDV output's schema path directly.
- Add Confusion Matrix and ROC components to tf taxi sample.

* Follow up code review comments.
  • Loading branch information
qimingj authored and k8s-ci-robot committed Dec 11, 2018
1 parent e64a766 commit 0901f1e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 30 deletions.
7 changes: 4 additions & 3 deletions components/dataflow/tfdv/src/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,12 @@ def run_validator(output_dir, column_names, key_columns, csv_data_file,
schema, column_names, key_columns)
with open('/output_schema.json', 'w+') as f:
json.dump(schema_json, f)
with file_io.FileIO(os.path.join(output_dir, 'schema.json'), 'w+') as f:
schema_json_file = os.path.join(output_dir, 'schema.json')
with file_io.FileIO(schema_json_file, 'w+') as f:
logging.getLogger().info('Writing JSON schema to {}'.format(f.name))
json.dump(schema_json, f)
with open('/schema.txt', 'w+') as f:
f.write(schema_json_file)

if not csv_data_file_to_validate:
return
Expand Down Expand Up @@ -192,8 +195,6 @@ def main():
args.csv_data_for_inference,
args.csv_data_to_validate,
args.project, args.mode)
with open('/output.txt', 'w+') as f:
f.write(args.output)


if __name__ == "__main__":
Expand Down
14 changes: 14 additions & 0 deletions components/local/confusion_matrix/src/confusion_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def main(argv=None):
parser = argparse.ArgumentParser(description='ML Trainer')
parser.add_argument('--predictions', type=str, help='GCS path of prediction file pattern.')
parser.add_argument('--output', type=str, help='GCS path of the output directory.')
parser.add_argument('--target_lambda', type=str,
help='a lambda function as a string to compute target.' +
'For example, "lambda x: x[\'a\'] + x[\'b\']"' +
'If not set, the input must include a "target" column.')
args = parser.parse_args()

schema_file = os.path.join(os.path.dirname(args.predictions), 'schema.json')
Expand All @@ -46,6 +50,16 @@ def main(argv=None):
dfs.append(pd.read_csv(f, names=names))

df = pd.concat(dfs)
if args.target_lambda:
df['target'] = df.apply(eval(args.target_lambda), axis=1)

# Convert "True" to "True_" and "False" to "False_" for frontend to work.
# TODO: Investigate frontend handling of boolean values.
# https://github.com/kubeflow/pipelines/issues/446
convert_fn = lambda x: str(x) + '_' if str(x).lower() in ['true', 'false'] else x
df['target'] = df['target'].apply(convert_fn)
df['predicted'] = df['predicted'].apply(convert_fn)

vocab = list(df['target'].unique())
cm = confusion_matrix(df['target'], df['predicted'], labels=vocab)
data = []
Expand Down
12 changes: 11 additions & 1 deletion components/local/roc/src/roc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ def main(argv=None):
parser = argparse.ArgumentParser(description='ML Trainer')
parser.add_argument('--predictions', type=str, help='GCS path of prediction file pattern.')
parser.add_argument('--trueclass', type=str, help='The name of the class as true value.')
parser.add_argument('--target_lambda', type=str,
help='a lambda function as a string to determine positive or negative.' +
'For example, "lambda x: x[\'a\'] and x[\'b\']". If missing, ' +
'trueclass must be set and input must have a "target" column.')
parser.add_argument('--output', type=str, help='GCS path of the output directory.')
args = parser.parse_args()

if not args.target_lambda and not args.trueclass:
raise ValueError('Either target_lambda or trueclass must be set.')

schema_file = os.path.join(os.path.dirname(args.predictions), 'schema.json')
schema = json.loads(file_io.read_file_to_string(schema_file))
names = [x['name'] for x in schema]
Expand All @@ -46,7 +53,10 @@ def main(argv=None):
dfs.append(pd.read_csv(f, names=names))

df = pd.concat(dfs)
df['target'] = df['target'].apply(lambda x: 1 if x == args.trueclass else 0)
if args.target_lambda:
df['target'] = df.apply(eval(args.target_lambda), axis=1)
else:
df['target'] = df['target'].apply(lambda x: 1 if x == args.trueclass else 0)
fpr, tpr, thresholds = roc_curve(df['target'], df[args.trueclass])
df_roc = pd.DataFrame({'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds})
roc_file = os.path.join(args.output, 'roc.csv')
Expand Down
69 changes: 43 additions & 26 deletions samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:0.1.3-rc.2', #TODO-release: update the release tag for the next release
image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:dev', #TODO-release: update the release tag for the next release
arguments = [
'--csv-data-for-inference', inference_data,
'--csv-data-to-validate', validation_data,
'--column-names', column_names,
'--key-columns', key_columns,
'--project', project,
'--mode', mode,
'--output', validation_output,
'--output', '%s/{{workflow.name}}/validation' % validation_output,
],
file_outputs = {
'output': '/output.txt',
'schema': '/output_schema.json',
'schema': '/schema.txt',
'validation': '/output_validation_result.txt',
}
).apply(gcp.use_gcp_secret('user-gcp-sa'))

Expand All @@ -48,7 +48,7 @@ def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', sc
'--project', project,
'--mode', preprocess_mode,
'--preprocessing-module', preprocess_module,
'--output', transform_output,
'--output', '%s/{{workflow.name}}/transformed' % transform_output,
],
file_outputs = {'transformed': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
Expand All @@ -66,7 +66,7 @@ def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate
'--steps', steps,
'--target', target,
'--preprocessing-module', preprocess_module,
'--job-dir', training_output,
'--job-dir', '%s/{{workflow.name}}/train' % training_output,
],
file_outputs = {'train': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
Expand All @@ -82,7 +82,7 @@ def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'Gc
'--project', project,
'--mode', analyze_mode,
'--slice-columns', analyze_slice_column,
'--output', analysis_output,
'--output', '%s/{{workflow.name}}/analysis' % analysis_output,
],
file_outputs = {'analysis': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
Expand All @@ -99,11 +99,34 @@ def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]
'--model', model,
'--mode', predict_mode,
'--project', project,
'--output', prediction_output,
'--output', '%s/{{workflow.name}}/predict' % prediction_output,
],
file_outputs = {'prediction': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))


def confusion_matrix_op(predictions: 'GcsUri', output: 'GcsUri', step_name='confusion_matrix'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:dev', #TODO-release: update the release tag for the next release
arguments=[
'--output', '%s/{{workflow.name}}/confusionmatrix' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: (x['target'] > x['fare'] * 0.2)""",
])


def roc_op(predictions: 'GcsUri', output: 'GcsUri', step_name='roc'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-roc:dev', #TODO-release: update the release tag for the next release
arguments=[
'--output', '%s/{{workflow.name}}/roc' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0""",
])


def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):
return dsl.ContainerOp(
name = step_name,
Expand All @@ -126,31 +149,25 @@ def taxi_cab_classification(
key_columns='trip_start_timestamp',
train='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv',
evaluation='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv',
validation_mode='local',
preprocess_mode='local',
mode='local',
preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',
target='tips',
learning_rate=0.1,
hidden_layer_size='1500',
steps=3000,
predict_mode='local',
analyze_mode='local',
analyze_slice_column='trip_start_hour'):

validation_output = '%s/{{workflow.name}}/validation' % output
transform_output = '%s/{{workflow.name}}/transformed' % output
training_output = '%s/{{workflow.name}}/train' % output
analysis_output = '%s/{{workflow.name}}/analysis' % output
prediction_output = '%s/{{workflow.name}}/predict' % output
tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}'

validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, validation_mode, validation_output)
schema = '%s/schema.json' % validation.outputs['output']

preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, preprocess_module, transform_output)
training = tf_train_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output)
analysis = dataflow_tf_model_analyze_op(training.output, evaluation, schema, project, analyze_mode, analyze_slice_column, analysis_output)
prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output)
validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, mode, output)
preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'],
project, mode, preprocess_module, output)
training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate,
hidden_layer_size, steps, 'tips', preprocess_module, output)
analysis = dataflow_tf_model_analyze_op(training.output, evaluation,
validation.outputs['schema'], project, mode, analyze_slice_column, output)
prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], 'tips',
training.output, mode, project, output)
cm = confusion_matrix_op(prediction.output, output)
roc = roc_op(prediction.output, output)
deploy = kubeflow_deploy_op(training.output, tf_server_name)

if __name__ == '__main__':
Expand Down

0 comments on commit 0901f1e

Please sign in to comment.