diff --git a/components/dataflow/predict/component.yaml b/components/dataflow/predict/component.yaml new file mode 100644 index 00000000000..0059dc4b993 --- /dev/null +++ b/components/dataflow/predict/component.yaml @@ -0,0 +1,31 @@ +name: Predict using TF on Dataflow +description: | + Runs TensorFlow prediction on Google Cloud Dataflow + Input and output data is in GCS +inputs: + - {name: Data file pattern, type: GCPPath, description: 'GCS or local path of test file patterns.'} # type: {GCSPath: {data_type: CSV}} + - {name: Schema, type: GCPPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: TFDV schema JSON}} + - {name: Target column, type: String, description: 'Name of the column for prediction target.'} + - {name: Model, type: GCPPath, description: 'GCS or local path of model trained with tft preprocessed data.'} # Models trained with estimator are exported to base/export/export/123456781 directory. # Our trainer export only one model. #TODO: Output single model from trainer # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}} + - {name: Batch size, type: Integer, default: '32', description: 'Batch size used in prediction.'} + - {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".'} + - {name: GCP project, type: GcpProject, description: 'The GCP project to run the dataflow job.'} + - {name: Predictions dir, type: GCPPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}} +outputs: + - {name: Predictions dir, type: GCPPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, /ml/predict.py] + args: [ + --data, {inputValue: Data file pattern}, + --schema, {inputValue: Schema}, + --target, {inputValue: Target column}, + --model, {inputValue: Model}, + --mode, {inputValue: Run mode}, + --project, {inputValue: GCP project}, + --batchsize, {inputValue: Batch size}, + --output, {inputValue: Predictions dir}, + ] + fileOutputs: + Predictions dir: /output.txt diff --git a/components/dataflow/tfdv/component.yaml b/components/dataflow/tfdv/component.yaml new file mode 100644 index 00000000000..63028ee795a --- /dev/null +++ b/components/dataflow/tfdv/component.yaml @@ -0,0 +1,34 @@ +name: TFX - Data Validation +description: | + Runs Tensorflow Data Validation. https://www.tensorflow.org/tfx/data_validation/get_started + Tensorflow Data Validation (TFDV) can analyze training and serving data to: + * compute descriptive statistics, + * infer a schema, + * detect data anomalies. +inputs: +- {name: Inference data, type: GCPPath, description: GCS path of the CSV file from which to infer the schema.} # type: {GCSPath: {data_type: CSV}} +- {name: Validation data, type: GCPPath, description: GCS path of the CSV file whose contents should be validated.} # type: {GCSPath: {data_type: CSV}} +- {name: Column names, type: GCPPath, description: GCS json file containing a list of column names.} # type: {GCSPath: {data_type: JSON}} +- {name: Key columns, type: String, description: Comma separated list of columns to treat as keys.} +- {name: GCP project, type: GcpProject, default: '', description: The GCP project to run the dataflow job.} +- {name: Run mode, type: String, default: local, description: Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud". } +- {name: Validation output, type: GCPPath, description: GCS or local directory.} # type: {GCSPath: {path_type: Directory}} +outputs: +- {name: Schema, type: GCPPath, description: GCS path of the inferred schema JSON.} # type: {GCSPath: {data_type: TFDV schema JSON}} +- {name: Validation result, type: String, description: Indicates whether anomalies were detected or not.} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, /ml/validate.py] + args: [ + --csv-data-for-inference, {inputValue: Inference data}, + --csv-data-to-validate, {inputValue: Validation data}, + --column-names, {inputValue: Column names}, + --key-columns, {inputValue: Key columns}, + --project, {inputValue: GCP project}, + --mode, {inputValue: Run mode}, + --output, {inputValue: Validation output}, + ] + fileOutputs: + Schema: /schema.txt + Validation result: /output_validation_result.txt \ No newline at end of file diff --git a/components/dataflow/tfma/component.yaml b/components/dataflow/tfma/component.yaml new file mode 100644 index 00000000000..95797556e3a --- /dev/null +++ b/components/dataflow/tfma/component.yaml @@ -0,0 +1,32 @@ +name: TFX - Analyze model +description: | + Runs Tensorflow Model Analysis. https://www.tensorflow.org/tfx/model_analysis/get_started + TensorFlow Model Analysis allows you to perform model evaluations in the TFX pipeline, and view resultant metrics and plots in a Jupyter notebook. Specifically, it can provide: + * metrics computed on entire training and holdout dataset, as well as next-day evaluations + * tracking metrics over time + * model quality performance on different feature slices +inputs: +- {name: Model, type: GCPPath, description: GCS path to the model which will be evaluated.} # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}} +- {name: Evaluation data, type: GCPPath, description: GCS path of eval files.} # type: {GCSPath: {data_type: CSV}} +- {name: Schema, type: GCPPath, description: GCS json schema file path.} # type: {GCSPath: {data_type: TFDV schema JSON}} +- {name: Run mode, type: String, default: local, description: whether to run the job locally or in Cloud Dataflow.} +- {name: GCP project, type: GcpProject, default: '', description: 'The GCP project to run the dataflow job, if running in the `cloud` mode.'} +- {name: Slice columns, type: String, description: Comma-separated list of columns on which to slice for analysis.} +- {name: Analysis results dir, type: GCPPath, description: GCS or local directory where the analysis results should be written.} # type: {GCSPath: {path_type: Directory}} +outputs: +- {name: Analysis results dir, type: GCPPath, description: GCS or local directory where the analysis results should were written.} # type: {GCSPath: {path_type: Directory}} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, /ml/model_analysis.py] + args: [ + --model, {inputValue: Model}, + --eval, {inputValue: Evaluation data}, + --schema, {inputValue: Schema}, + --mode, {inputValue: Run mode}, + --project, {inputValue: GCP project}, + --slice-columns, {inputValue: Slice columns}, + --output, {inputValue: Analysis results dir}, + ] + fileOutputs: + Analysis results dir: /output.txt diff --git a/components/dataflow/tft/component.yaml b/components/dataflow/tft/component.yaml new file mode 100644 index 00000000000..1b9e642b198 --- /dev/null +++ b/components/dataflow/tft/component.yaml @@ -0,0 +1,27 @@ +name: Transform using TF on Dataflow +description: Runs TensorFlow Transform on Google Cloud Dataflow +inputs: + - {name: Training data file pattern, type: GCPPath, description: 'GCS path of train file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}} + - {name: Evaluation data file pattern, type: GCPPath, description: 'GCS path of eval file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}} + - {name: Schema, type: GCPPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: JSON}} + - {name: GCP project, type: GcpProject, description: 'The GCP project to run the dataflow job.'} + - {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".' } + - {name: Preprocessing module, type: GCPPath, default: '', description: 'GCS path to a python file defining "preprocess" and "get_feature_columns" functions.'} # type: {GCSPath: {data_type: Python}} + - {name: Transformed data dir, type: GCPPath, description: 'GCS or local directory'} #Also supports local paths # type: {GCSPath: {path_type: Directory}} +outputs: + - {name: Transformed data dir, type: GCPPath} # type: {GCSPath: {path_type: Directory}} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, /ml/transform.py] + args: [ + --train, {inputValue: Training data file pattern}, + --eval, {inputValue: Evaluation data file pattern}, + --schema, {inputValue: Schema}, + --project, {inputValue: GCP project}, + --mode, {inputValue: Run mode}, + --preprocessing-module, {inputValue: Preprocessing module}, + --output, {inputValue: Transformed data dir}, + ] + fileOutputs: + Transformed data dir: /output.txt diff --git a/components/kubeflow/deployer/component.yaml b/components/kubeflow/deployer/component.yaml new file mode 100644 index 00000000000..95cce10936f --- /dev/null +++ b/components/kubeflow/deployer/component.yaml @@ -0,0 +1,21 @@ +name: Kubeflow - Serve TF model +description: Serve TensorFlow model using Kubeflow TF-serving +inputs: + - {name: Model dir, type: GCPPath, description: 'Path of GCS directory containing exported Tensorflow model.'} # type: {GCSPath: {path_type: Directory}} + - {name: Cluster name, type: String, default: '', description: 'Kubernetes cluster name where the TS-serving service should be deployed. Uses the current cluster by default.'} + - {name: Namespace, type: String, default: 'kubeflow', description: 'Kubernetes namespace where the TS-serving service should be deployed.'} + - {name: Server name, type: String, default: 'model-server', description: 'TF-serving server name to use when deploying.'} + - {name: PVC name, type: String, default: '' , description: 'Optional PersistentVolumeClaim to use.'} +#outputs: +# - {name: Endppoint URI, type: Serving URI, description: 'URI of the deployed prediction service..'} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [/bin/deploy.sh] + args: [ + --model-export-path, {inputValue: Model dir}, + --cluster-name, {inputValue: Cluster name}, + --namespace, {inputValue: Namespace}, + --server-name, {inputValue: Server name}, + --pvc-name, {inputValue: PVC name}, + ] diff --git a/components/kubeflow/dnntrainer/component.yaml b/components/kubeflow/dnntrainer/component.yaml new file mode 100644 index 00000000000..fd8a77e8b6d --- /dev/null +++ b/components/kubeflow/dnntrainer/component.yaml @@ -0,0 +1,33 @@ +name: Train FC DNN using TF +description: Trains fully-connected neural network using Tensorflow +inputs: + - {name: Transformed data dir, type: GCPPath, description: 'GCS path containing tf-transformed training and eval data.'} # type: {GCSPath: {path_type: Directory}} + - {name: Schema, type: GCPPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: JSON}} + - {name: Learning rate, type: Float, default: '0.1', description: 'Learning rate for training.'} + - {name: Optimizer, type: String, default: 'Adagrad', description: 'Optimizer for training. Valid values are: Adam, SGD, Adagrad. If not provided, tf.estimator default will be used.'} + - {name: Hidden layer size, type: String, default: '100', description: 'Comma-separated hidden layer sizes. For example "200,100,50".'} + - {name: Steps, type: Integer, description: 'Maximum number of training steps to perform. If unspecified, will honor epochs.'} + #- {name: Epochs, type: Integer, default: '', description: 'Maximum number of training data epochs on which to train. If both "steps" and "epochs" are specified, the training job will run for "steps" or "epochs", whichever occurs first.'} + - {name: Target, type: String, description: 'Name of the column for prediction target.'} + - {name: Preprocessing module, type: GCPPath, default: '', description: 'GCS path to a python file defining "preprocess" and "get_feature_columns" functions.'} # type: {GCSPath: {data_type: Python}} + - {name: Training output dir, type: GCPPath, description: 'GCS or local directory.'} # type: {GCSPath: {path_type: Directory}} +outputs: + - {name: Training output dir, type: GCPPath, description: 'GCS or local directory.'} # type: {GCSPath: {path_type: Directory}} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, -m, trainer.task] + args: [ + --transformed-data-dir, {inputValue: Transformed data dir}, + --schema, {inputValue: Schema}, + --learning-rate, {inputValue: Learning rate}, + --optimizer, {inputValue: Optimizer}, + --hidden-layer-size, {inputValue: Hidden layer size}, + --steps, {inputValue: Steps}, +# --epochs, {inputValue: Epochs}, + --target, {inputValue: Target}, + --preprocessing-module, {inputValue: Preprocessing module}, + --job-dir, {inputValue: Training output dir}, + ] + fileOutputs: + Training output dir: /output.txt diff --git a/components/kubeflow/katib-launcher/component.yaml b/components/kubeflow/katib-launcher/component.yaml new file mode 100644 index 00000000000..0cbcdf4c88a --- /dev/null +++ b/components/kubeflow/katib-launcher/component.yaml @@ -0,0 +1,38 @@ +name: Kubeflow - Launch StudyJob +description: Kubeflow StudyJob launcher +inputs: +- {name: StudyJob name, type: String, description: 'Job name.'} +- {name: Namespace, type: String, default: kubeflow, description: 'Namespace.'} +- {name: Optimization type, type: String, default: minimize, description: 'Direction of optimization. minimize or maximize.'} +- {name: Objective value name, type: String, description: 'Objective value name which trainer optimizes.'} +- {name: Optimization goal, type: Float, description: 'Stop studying once objectivevaluename value exceeds optimizationgoal'} +- {name: Request count, type: Integer, default: 1, description: 'Number of requests to the suggestion service.'} +- {name: Metrics names, type: String, description: 'List of metric names (comma-delimited).'} +- {name: Parameter configs, type: YAML, default: '', description: 'Parameter configs (YAML/JSON format).'} +- {name: NAS config, type: YAML, default: '', description: 'NAS config (YAML/JSON format).'} +- {name: Worker template path, type: String, default: '', description: 'Worker spec.'} +- {name: Metrics collector template path, type: String, default: '', description: 'Metrics collector spec.'} +- {name: Suggestion spec, type: YAML, default: '', description: 'Suggestion spec (YAML/JSON format).'} +- {name: StudyJob timeout minutes, type: Integer, default: '10', description: 'Time in minutes to wait for the StudyJob to complete'} +outputs: +- {name: Best parameter set, type: JSON, description: 'The parameter set of the best StudyJob trial.'} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-kubeflow-studyjob:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python, /ml/launch_study_job.py] + args: [ + --name, {inputValue: StudyJob name}, + --namespace, {inputValue: Namespace}, + --optimizationtype, {inputValue: Optimization type}, + --objectivevaluename, {inputValue: Objective value name}, + --optimizationgoal, {inputValue: Optimization goal}, + --requestcount, {inputValue: Request count}, + --metricsnames, {inputValue: Metrics names}, + --parameterconfigs, {inputValue: Parameter configs}, + --nasConfig, {inputValue: NAS config}, + --workertemplatepath, {inputValue: Worker template path}, + --mcollectortemplatepath, {inputValue: Metrics collector template path}, + --suggestionspec, {inputValue: Suggestion spec}, + --studyjobtimeoutminutes, {inputValue: StudyJob timeout minutes}, + --outputfile, {outputPath: Best parameter set}, + ] diff --git a/components/local/confusion_matrix/component.yaml b/components/local/confusion_matrix/component.yaml new file mode 100644 index 00000000000..623fbf08204 --- /dev/null +++ b/components/local/confusion_matrix/component.yaml @@ -0,0 +1,20 @@ +name: Confusion matrix +description: Calculates confusion matrix +inputs: + - {name: Predictions, type: GCPPath, description: 'GCS path of prediction file pattern.'} # type: {GCSPath: {data_type: CSV}} + - {name: Output dir, type: GCPPath, description: 'GCS path of the output directory.'} # type: {GCSPath: {path_type: Directory}} +#outputs: +# - {name: UI metadata, type: UI metadata} +# - {name: Metrics, type: Metrics} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, /ml/confusion_matrix.py] + args: [ + --predictions, {inputValue: Predictions}, + --output, {inputValue: Output dir}, + ] +#Argo deletes the source files as soon as it uploads them to the artifact store. Trying to output the same files as parameter outputs fails since the source files are already deleted. +# fileOutputs: +# UI metadata: /mlpipeline-ui-metadata.json +# Metrics: /mlpipeline-metrics.json diff --git a/components/local/roc/component.yaml b/components/local/roc/component.yaml new file mode 100644 index 00000000000..cd6b65a70bd --- /dev/null +++ b/components/local/roc/component.yaml @@ -0,0 +1,25 @@ +name: ROC curve +description: Calculates Receiver Operating Characteristic curve. See https://en.wikipedia.org/wiki/Receiver_operating_characteristic +inputs: + - {name: Predictions dir, type: GCPPath, description: 'GCS path of prediction file pattern.'} #TODO: Replace dir data + schema files # type: {GCSPath: {path_type: Directory}} + - {name: True class, type: String, default: 'true', description: 'The true class label for the sample. Default is "true".'} + - {name: True score column, type: String, default: 'true', description: 'The name of the column for positive probability.'} + - {name: Target lambda, type: String, default: '', description: 'Text of Python lambda function which returns boolean value indicating whether the classification result is correct.\nFor example, "lambda x: x[''a''] and x[''b'']". If missing, input must have a "target" column.'} + - {name: Output dir, type: GCPPath, description: 'GCS path of the output directory.'} #TODO: Replace dir with single file # type: {GCSPath: {path_type: Directory}} +#outputs: +# - {name: UI metadata, type: UI metadata} +# - {name: Metrics, type: Metrics} +implementation: + container: + image: gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:2c2445df83fa879387a200747cc20f72a7ee9727 + command: [python2, /ml/confusion_matrix.py] + args: [ + --predictions, {inputValue: Predictions dir}, + --trueclass, {inputValue: True class}, + --true_score_column, {inputValue: True score column}, + --target_lambda, {inputValue: Target lambda}, + --output, {inputValue: Output dir}, + ] +# fileOutputs: +# UI metadata: /mlpipeline-ui-metadata.json +# Metrics: /mlpipeline-metrics.json diff --git a/samples/kubeflow-tf/kubeflow-training-classification.py b/samples/kubeflow-tf/kubeflow-training-classification.py index eb40cb42cc6..56423088175 100755 --- a/samples/kubeflow-tf/kubeflow-training-classification.py +++ b/samples/kubeflow-tf/kubeflow-training-classification.py @@ -13,75 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. - import kfp.dsl as dsl import kfp.gcp as gcp -import datetime - -def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'): - return dsl.ContainerOp( - name = step_name, - image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:2c2445df83fa879387a200747cc20f72a7ee9727', - arguments = [ - '--train', train_data, - '--eval', evaluation_data, - '--schema', schema, - '--project', project, - '--mode', preprocess_mode, - '--preprocessing-module', preprocess_module, - '--output', transform_output, - ], - file_outputs = {'transformed': '/output.txt'} - ) - - -def kubeflow_tf_training_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training', use_gpu=False): - kubeflow_tf_training_op = dsl.ContainerOp( - name = step_name, - image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:2c2445df83fa879387a200747cc20f72a7ee9727', - arguments = [ - '--transformed-data-dir', transformed_data_dir, - '--schema', schema, - '--learning-rate', learning_rate, - '--hidden-layer-size', hidden_layer_size, - '--steps', steps, - '--target', target, - '--preprocessing-module', preprocess_module, - '--job-dir', training_output, - ], - file_outputs = {'train': '/output.txt'} - ) - if use_gpu: - kubeflow_tf_training_op.image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer-gpu:2c2445df83fa879387a200747cc20f72a7ee9727' - kubeflow_tf_training_op.set_gpu_limit(1) - - return kubeflow_tf_training_op -def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'): - return dsl.ContainerOp( - name = step_name, - image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:2c2445df83fa879387a200747cc20f72a7ee9727', - arguments = [ - '--data', evaluation_data, - '--schema', schema, - '--target', target, - '--model', model, - '--mode', predict_mode, - '--project', project, - '--output', prediction_output, - ], - file_outputs = {'prediction': '/output.txt'} - ) +from kfp import components -def confusion_matrix_op(predictions, output, step_name='confusionmatrix'): - return dsl.ContainerOp( - name = step_name, - image = 'gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:2c2445df83fa879387a200747cc20f72a7ee9727', - arguments = [ - '--predictions', predictions, - '--output', output, - ] - ) +dataflow_tf_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/dataflow/tft/component.yaml') +kubeflow_tf_training_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/dnntrainer/component.yaml') +dataflow_tf_predict_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/dataflow/predict/component.yaml') +confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/local/confusion_matrix/component.yaml') @dsl.pipeline( name='Pipeline TFJob', @@ -104,10 +44,45 @@ def kubeflow_training(output, project, # set the flag to use GPU trainer use_gpu = False - preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, '', '%s/%s/transformed' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) - training = kubeflow_tf_training_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, '', '%s/%s/train' % (output, workflow), use_gpu=use_gpu).apply(gcp.use_gcp_secret('user-gcp-sa')) - prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, '%s/%s/predict' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) - confusion_matrix = confusion_matrix_op(prediction.output, '%s/%s/confusionmatrix' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa')) + preprocess = dataflow_tf_transform_op( + training_data_file_pattern=train, + evaluation_data_file_pattern=evaluation, + schema=schema, + gcp_project=project, + run_mode=preprocess_mode, + preprocessing_module='', + transformed_data_dir='%s/%s/transformed' % (output, workflow) + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + training = kubeflow_tf_training_op( + transformed_data_dir=preprocess.output, + schema=schema, + learning_rate=learning_rate, + hidden_layer_size=hidden_layer_size, + steps=steps, + target=target, + preprocessing_module='', + training_output_dir='%s/%s/train' % (output, workflow) + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + if use_gpu: + training.image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer-gpu:2c2445df83fa879387a200747cc20f72a7ee9727', + training.set_gpu_limit(1) + + prediction = dataflow_tf_predict_op( + data_file_pattern=evaluation, + schema=schema, + target_column=target, + model=training.output, + run_mode=predict_mode, + gcp_project=project, + predictions_dir='%s/%s/predict' % (output, workflow) + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + confusion_matrix = confusion_matrix_op( + predictions=prediction.output, + output_dir='%s/%s/confusionmatrix' % (output, workflow) + ).apply(gcp.use_gcp_secret('user-gcp-sa')) if __name__ == '__main__': diff --git a/samples/xgboost-spark/xgboost-training-cm.py b/samples/xgboost-spark/xgboost-training-cm.py index b828153389d..05ec4c07850 100755 --- a/samples/xgboost-spark/xgboost-training-cm.py +++ b/samples/xgboost-spark/xgboost-training-cm.py @@ -17,6 +17,10 @@ import kfp.dsl as dsl import kfp.gcp as gcp +from kfp import components + +confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/local/confusion_matrix/component.yaml') +roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/local/roc/component.yaml') # ================================================================ # The following classes should be provided by components provider. @@ -135,32 +139,6 @@ def __init__(self, name, project, region, cluster_name, data, model, target, ana ], file_outputs={'output': '/output.txt'}) - -class ConfusionMatrixOp(dsl.ContainerOp): - - def __init__(self, name, predictions, output): - super(ConfusionMatrixOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:2c2445df83fa879387a200747cc20f72a7ee9727', - arguments=[ - '--output', output, - '--predictions', predictions - ]) - - -class RocOp(dsl.ContainerOp): - - def __init__(self, name, predictions, trueclass, output): - super(RocOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-local-roc:2c2445df83fa879387a200747cc20f72a7ee9727', - arguments=[ - '--output', output, - '--predictions', predictions, - '--trueclass', trueclass, - '--true_score_column', trueclass, - ]) - # ======================================================================= @dsl.pipeline( @@ -197,10 +175,10 @@ def xgb_train_pipeline( predict_op = PredictOp('predict', project, region, create_cluster_op.output, transform_op.outputs['eval'], train_op.output, target, analyze_op.output, '%s/{{workflow.name}}/predict' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) - confusion_matrix_op = ConfusionMatrixOp('confusion-matrix', predict_op.output, + confusion_matrix_task = confusion_matrix_op(predict_op.output, '%s/{{workflow.name}}/confusionmatrix' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) - roc_op = RocOp('roc', predict_op.output, true_label, '%s/{{workflow.name}}/roc' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) + roc_task = roc_op(predict_op.output, true_label, '%s/{{workflow.name}}/roc' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) if __name__ == '__main__': import kfp.compiler as compiler