From 883091542471c4535d918b6d41f3422c803f927b Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Wed, 22 Apr 2020 15:53:45 -0700 Subject: [PATCH] Support input artifact (#104) * support input artifacts * fix conflicts * fix merging bug --- .../kfp_tekton/compiler/_op_to_template.py | 45 ++++-- sdk/python/tests/compiler/compiler_tests.py | 9 +- .../testdata/input_artifact_raw_value.py | 70 +++++++++ .../testdata/input_artifact_raw_value.txt | 1 + .../testdata/input_artifact_raw_value.yaml | 142 ++++++++++++++++++ sdk/python/tests/test_kfp_samples_report.txt | 2 +- 6 files changed, 256 insertions(+), 13 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.py create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml diff --git a/sdk/python/kfp_tekton/compiler/_op_to_template.py b/sdk/python/kfp_tekton/compiler/_op_to_template.py index 3c4fb34c045..e7f55b73e2e 100644 --- a/sdk/python/kfp_tekton/compiler/_op_to_template.py +++ b/sdk/python/kfp_tekton/compiler/_op_to_template.py @@ -23,6 +23,7 @@ import yaml import re import os +import copy from .. import tekton_api_version @@ -213,6 +214,17 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): steps.extend(template['spec']['steps']) template['spec']['steps'] = steps + # initial base_step and volume setup + base_step = { + 'image': 'busybox', + 'name': 'copy-results', + 'script': '#!/bin/sh\nset -exo pipefail\n' + } + volume_mount_step_template = [] + volume_template = [] + mounted_param_paths = [] + replaced_param_list = [] + # inputs input_artifact_paths = processed_op.input_artifact_paths if isinstance(processed_op, dsl.ContainerOp) else None artifact_arguments = processed_op.artifact_arguments if isinstance(processed_op, dsl.ContainerOp) else None @@ -222,8 +234,27 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): template['spec']['params'] = inputs['parameters'] elif isinstance(op, dsl.ResourceOp): template['spec']['params'].extend(inputs['parameters']) - elif 'artifacts' in inputs: - raise NotImplementedError("input artifacts are not yet implemented") + if 'artifacts' in inputs: + # The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input. + # Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation. + copy_inputs_step = copy.deepcopy(base_step) + copy_inputs_step['name'] = 'copy-inputs' + for artifact in inputs['artifacts']: + if 'raw' in artifact: + copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (artifact['raw']['data'], artifact['path']) + mountPath = artifact['path'].rsplit("/", 1)[0] + if mountPath not in mounted_param_paths: + volume_mount_step_template.append({'name': artifact['name'], 'mountPath': artifact['path'].rsplit("/", 1)[0]}) + volume_template.append({'name': artifact['name'], 'emptyDir': {}}) + mounted_param_paths.append(mountPath) + copy_inputs_step['script'] = literal_str(copy_inputs_step['script']) + with_inputs_step = [copy_inputs_step] + with_inputs_step.extend(template['spec']['steps']) + template['spec']['steps'] = with_inputs_step + if volume_mount_step_template: + template['spec']['stepTemplate'] = {} + template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template + template['spec']['volumes'] = volume_template # outputs if isinstance(op, dsl.ContainerOp): @@ -234,10 +265,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): param_outputs = {} outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts) if outputs_dict: - volume_mount_step_template = [] - volume_template = [] - mounted_param_paths = [] - replaced_param_list = [] if outputs_dict.get('parameters'): """ Since Tekton results need to be under /tekton/results. If file output paths cannot be @@ -254,11 +281,7 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): cp $LOCALPATH $(results.data.path); """ template['spec']['results'] = [] - copy_results_step = { - 'image': 'busybox', - 'name': 'copy-results', - 'script': '#!/bin/sh\nset -exo pipefail\n' - } + copy_results_step = copy.deepcopy(base_step) for name, path in processed_op.file_outputs.items(): name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore template['spec']['results'].append({ diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index bbe6b8875f3..e9074a02e1a 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -189,11 +189,18 @@ def test_pipeline_transformers_workflow(self): def test_artifact_location_workflow(self): """ - Test compiling a artifact location workflow. + Test compiling an artifact location workflow. """ from .testdata.artifact_location import custom_artifact_location self._test_pipeline_workflow(custom_artifact_location, 'artifact_location.yaml', enable_artifacts=True) + def test_input_artifact_raw_value_workflow(self): + """ + Test compiling an input artifact workflow. + """ + from .testdata.input_artifact_raw_value import input_artifact_pipeline + self._test_pipeline_workflow(input_artifact_pipeline, 'input_artifact_raw_value.yaml') + def test_katib_workflow(self): """ Test compiling a katib workflow. diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py new file mode 100644 index 00000000000..3c2d6191a3c --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py @@ -0,0 +1,70 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sys +from pathlib import Path + +sys.path.insert(0, __file__ + '/../../../../') + +import kfp +from kfp import dsl + + +def component_with_inline_input_artifact(text: str): + return dsl.ContainerOp( + name='component_with_inline_input_artifact', + image='alpine', + command=['cat', dsl.InputArgumentPath(text, path='/tmp/inputs/text/data', input='text')], # path and input are optional + ) + + +def component_with_input_artifact(text): + '''A component that passes text as input artifact''' + + return dsl.ContainerOp( + name='component_with_input_artifact', + artifact_argument_paths=[ + dsl.InputArgumentPath(argument=text, path='/tmp/inputs/text/data', input='text'), # path and input are optional + ], + image='alpine', + command=['cat', '/tmp/inputs/text/data'], + ) + +def component_with_hardcoded_input_artifact_value(): + '''A component that passes hard-coded text as input artifact''' + return component_with_input_artifact('hard-coded artifact value') + + +def component_with_input_artifact_value_from_file(file_path): + '''A component that passes contents of a file as input artifact''' + return component_with_input_artifact(Path(file_path).read_text()) + + +@dsl.pipeline( + name='Pipeline with artifact input raw argument value.', + description='Pipeline shows how to define artifact inputs and pass raw artifacts to them.' +) +def input_artifact_pipeline(): + component_with_inline_input_artifact('Constant artifact value') + component_with_input_artifact('Constant artifact value') + component_with_hardcoded_input_artifact_value() + + file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt')) + component_with_input_artifact_value_from_file(file_path) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(input_artifact_pipeline, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt new file mode 100644 index 00000000000..7953e7335b7 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt @@ -0,0 +1 @@ +Text from a file with hard-coded artifact value diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml new file mode 100644 index 00000000000..9a7c9583a6f --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml @@ -0,0 +1,142 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-inline-input-artifact +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "Constant artifact value" > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-inline-input-artifact + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-input-artifact +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "Constant artifact value" > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-input-artifact + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-input-artifact-2 +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "hard-coded artifact value" > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-input-artifact-2 + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-input-artifact-3 +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "Text from a file with hard-coded artifact value + " > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-input-artifact-3 + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "Pipeline shows how to + define artifact inputs and pass raw artifacts to them.", "name": "Pipeline with + artifact input raw argument value."}' + name: pipeline-with-artifact-input-raw-argument-value +spec: + params: [] + tasks: + - name: component-with-inline-input-artifact + params: [] + taskRef: + name: component-with-inline-input-artifact + - name: component-with-input-artifact + params: [] + taskRef: + name: component-with-input-artifact + - name: component-with-input-artifact-2 + params: [] + taskRef: + name: component-with-input-artifact-2 + - name: component-with-input-artifact-3 + params: [] + taskRef: + name: component-with-input-artifact-3 diff --git a/sdk/python/tests/test_kfp_samples_report.txt b/sdk/python/tests/test_kfp_samples_report.txt index 12d0412c26f..8bde37b469d 100644 --- a/sdk/python/tests/test_kfp_samples_report.txt +++ b/sdk/python/tests/test_kfp_samples_report.txt @@ -5,7 +5,7 @@ SUCCESS: basic_no_decorator.py SUCCESS: coin.py SUCCESS: compose.py SUCCESS: default_value.py -FAILURE: input_artifact_raw_value.py +SUCCESS: input_artifact_raw_value.py FAILURE: loop_over_lightweight_output.py SUCCESS: param_op_transform.py SUCCESS: param_substitutions.py