diff --git a/samples/kubeflow-tf/kubeflow-training-classification.py b/samples/kubeflow-tf/kubeflow-training-classification.py index c01a1ddcb73..45170e762cf 100755 --- a/samples/kubeflow-tf/kubeflow-training-classification.py +++ b/samples/kubeflow-tf/kubeflow-training-classification.py @@ -32,7 +32,7 @@ def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', sc '--output', transform_output, ], file_outputs = {'transformed': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def kubeflow_tf_training_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'): @@ -50,7 +50,7 @@ def kubeflow_tf_training_op(transformed_data_dir, schema: 'GcsUri[text/json]', l '--job-dir', training_output, ], file_outputs = {'train': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'): return dsl.ContainerOp( @@ -66,7 +66,7 @@ def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json] '--output', prediction_output, ], file_outputs = {'prediction': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def confusion_matrix_op(predictions, output, step_name='confusionmatrix'): return dsl.ContainerOp( @@ -76,7 +76,7 @@ def confusion_matrix_op(predictions, output, step_name='confusionmatrix'): '--predictions', predictions, '--output', output, ] - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) @dsl.pipeline( name='Pipeline TFJob', @@ -97,10 +97,10 @@ def kubeflow_training(output, project, # TODO: use the argo job name as the workflow workflow = '{{workflow.name}}' - preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, '', '%s/%s/transformed' % (output, workflow)) - training = kubeflow_tf_training_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, '', '%s/%s/train' % (output, workflow)) - prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, '%s/%s/predict' % (output, workflow)) - confusion_matrix = confusion_matrix_op(prediction.output, '%s/%s/confusionmatrix' % (output, workflow)) + preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, '', '%s/%s/transformed' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) + training = kubeflow_tf_training_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, '', '%s/%s/train' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) + prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, '%s/%s/predict' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) + confusion_matrix = confusion_matrix_op(prediction.output, '%s/%s/confusionmatrix' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) if __name__ == '__main__': diff --git a/samples/tfx/taxi-cab-classification-pipeline.py b/samples/tfx/taxi-cab-classification-pipeline.py index d2ba230d935..417abf11654 100755 --- a/samples/tfx/taxi-cab-classification-pipeline.py +++ b/samples/tfx/taxi-cab-classification-pipeline.py @@ -35,7 +35,7 @@ def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'G 'schema': '/schema.txt', 'validation': '/output_validation_result.txt', } - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'): return dsl.ContainerOp( @@ -51,7 +51,7 @@ def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', sc '--output', '%s/{{workflow.name}}/transformed' % transform_output, ], file_outputs = {'transformed': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'): @@ -69,7 +69,7 @@ def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate '--job-dir', '%s/{{workflow.name}}/train' % training_output, ], file_outputs = {'train': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'): return dsl.ContainerOp( @@ -85,7 +85,7 @@ def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'Gc '--output', '%s/{{workflow.name}}/analysis' % analysis_output, ], file_outputs = {'analysis': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'): @@ -102,7 +102,7 @@ def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json] '--output', '%s/{{workflow.name}}/predict' % prediction_output, ], file_outputs = {'prediction': '/output.txt'} - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) def confusion_matrix_op(predictions: 'GcsUri', output: 'GcsUri', step_name='confusion_matrix'): @@ -157,18 +157,18 @@ def taxi_cab_classification( analyze_slice_column='trip_start_hour'): tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}' - validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, mode, output) + validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, mode, output).apply(gcp.use_gcp_secret('user-gcp-sa')) preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'], - project, mode, preprocess_module, output) + project, mode, preprocess_module, output).apply(gcp.use_gcp_secret('user-gcp-sa')) training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate, - hidden_layer_size, steps, 'tips', preprocess_module, output) + hidden_layer_size, steps, 'tips', preprocess_module, output).apply(gcp.use_gcp_secret('user-gcp-sa')) analysis = dataflow_tf_model_analyze_op(training.output, evaluation, - validation.outputs['schema'], project, mode, analyze_slice_column, output) + validation.outputs['schema'], project, mode, analyze_slice_column, output).apply(gcp.use_gcp_secret('user-gcp-sa')) prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], 'tips', - training.output, mode, project, output) - cm = confusion_matrix_op(prediction.output, output) - roc = roc_op(prediction.output, output) - deploy = kubeflow_deploy_op(training.output, tf_server_name) + training.output, mode, project, output).apply(gcp.use_gcp_secret('user-gcp-sa')) + cm = confusion_matrix_op(prediction.output, output).apply(gcp.use_gcp_secret('user-gcp-sa')) + roc = roc_op(prediction.output, output).apply(gcp.use_gcp_secret('user-gcp-sa')) + deploy = kubeflow_deploy_op(training.output, tf_server_name).apply(gcp.use_gcp_secret('user-gcp-sa')) if __name__ == '__main__': import kfp.compiler as compiler