From 95557c6d361889dff1a9c60971576cc8b92c9b4f Mon Sep 17 00:00:00 2001 From: Gang Pu Date: Thu, 20 May 2021 15:51:46 +0800 Subject: [PATCH] Support parallelism feature on DSL side (#592) --- .../kfp_tekton/compiler/_tekton_handler.py | 4 + sdk/python/kfp_tekton/compiler/compiler.py | 2 + sdk/python/tests/compiler/compiler_tests.py | 9 +++ .../testdata/loop_static_with_parallelism.py | 55 +++++++++++++ .../loop_static_with_parallelism.yaml | 72 +++++++++++++++++ ...tic_with_parallelism_pipelineloop_cr1.yaml | 80 +++++++++++++++++++ tekton-catalog/pipeline-loops/.gitignore | 2 + 7 files changed, 224 insertions(+) create mode 100644 sdk/python/tests/compiler/testdata/loop_static_with_parallelism.py create mode 100644 sdk/python/tests/compiler/testdata/loop_static_with_parallelism.yaml create mode 100644 sdk/python/tests/compiler/testdata/loop_static_with_parallelism_pipelineloop_cr1.yaml create mode 100644 tekton-catalog/pipeline-loops/.gitignore diff --git a/sdk/python/kfp_tekton/compiler/_tekton_handler.py b/sdk/python/kfp_tekton/compiler/_tekton_handler.py index bbced1089b3..b77b976d5c9 100644 --- a/sdk/python/kfp_tekton/compiler/_tekton_handler.py +++ b/sdk/python/kfp_tekton/compiler/_tekton_handler.py @@ -153,6 +153,10 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task # add loop special filed custom_task_cr['kind'] = 'PipelineLoop' + if custom_task[custom_task_key]['spec'].get('parallelism') is not None: + custom_task_cr['spec']['parallelism'] = custom_task[custom_task_key]['spec']['parallelism'] + # remove from pipeline run spec + del custom_task[custom_task_key]['spec']['parallelism'] custom_task_cr['spec']['iterateParam'] = custom_task[custom_task_key]['loop_args'] for custom_task_param in custom_task[custom_task_key]['spec']['params']: if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']: diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index dcc36e160a7..8c28ae96677 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -393,6 +393,8 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies, pipeline_ self.loops_pipeline[group_name]['spec']['params'].append({ 'name': param[0], 'value': '$(params.%s)' % param[0] }) + if sub_group.parallelism is not None: + self.loops_pipeline[group_name]['spec']['parallelism'] = sub_group.parallelism return template diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 63388532bcd..00baeac495c 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -140,6 +140,15 @@ def test_sidecar_workflow(self): from .testdata.sidecar import sidecar_pipeline self._test_pipeline_workflow(sidecar_pipeline, 'sidecar.yaml') + def test_loop_parallelism_workflow(self): + """ + Test compiling a loop with parallelism defined workflow. + """ + from .testdata.loop_static_with_parallelism import pipeline + self._test_pipeline_workflow( + pipeline, + 'loop_static_with_parallelism.yaml') + def test_loop_static_workflow(self): """ Test compiling a loop static params in workflow. diff --git a/sdk/python/tests/compiler/testdata/loop_static_with_parallelism.py b/sdk/python/tests/compiler/testdata/loop_static_with_parallelism.py new file mode 100644 index 00000000000..b7c5f5405b2 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/loop_static_with_parallelism.py @@ -0,0 +1,55 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kfp_tekton.compiler import TektonCompiler + + +class Coder: + def empty(self): + return "" + + +TektonCompiler._get_unique_id_code = Coder.empty + + +@dsl.pipeline(name='para-loop-pipeline') +def pipeline(my_pipe_param='10'): + loop_args = [1, 2, 3] + with dsl.ParallelFor(loop_args=loop_args, parallelism=5) as item: + op1 = dsl.ContainerOp( + name="para-loop-inner-op1", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo op1 %s %s" % (item, my_pipe_param)], + ) + + op2 = dsl.ContainerOp( + name="para-loop-inner-op2", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo op2 %s" % item], + ) + + op_out = dsl.ContainerOp( + name="para-loop-out-op", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo %s" % my_pipe_param], + ) + + +if __name__ == '__main__': + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/loop_static_with_parallelism.yaml b/sdk/python/tests/compiler/testdata/loop_static_with_parallelism.yaml new file mode 100644 index 00000000000..a10cab33e92 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/loop_static_with_parallelism.yaml @@ -0,0 +1,72 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param", + "optional": true}], "name": "para-loop-pipeline"}' + sidecar.istio.io/inject: 'false' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"para-loop-inner-op1": [], "para-loop-inner-op2": + [], "para-loop-out-op": []}' + tekton.dev/input_artifacts: '{}' + tekton.dev/output_artifacts: '{}' + name: para-loop-pipeline +spec: + params: + - name: my_pipe_param + value: '10' + pipelineSpec: + params: + - default: '10' + name: my_pipe_param + tasks: + - name: para-loop-out-op + params: + - name: my_pipe_param + value: $(params.my_pipe_param) + taskSpec: + metadata: + annotations: + tekton.dev/template: '' + labels: + pipelines.kubeflow.org/cache_enabled: 'true' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/pipelinename: '' + params: + - name: my_pipe_param + steps: + - args: + - echo $(inputs.params.my_pipe_param) + command: + - sh + - -c + image: library/bash:4.4.23 + name: main + timeout: 0s + - name: para-loop-pipeline-for-loop-2 + params: + - name: loop-item-param-1 + value: '[1, 2, 3]' + - name: my_pipe_param + value: $(params.my_pipe_param) + taskRef: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + name: para-loop-pipeline-for-loop-2 + timeout: 0s diff --git a/sdk/python/tests/compiler/testdata/loop_static_with_parallelism_pipelineloop_cr1.yaml b/sdk/python/tests/compiler/testdata/loop_static_with_parallelism_pipelineloop_cr1.yaml new file mode 100644 index 00000000000..585fd0af0a1 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/loop_static_with_parallelism_pipelineloop_cr1.yaml @@ -0,0 +1,80 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: custom.tekton.dev/v1alpha1 +kind: PipelineLoop +metadata: + name: para-loop-pipeline-for-loop-2 +spec: + pipelineSpec: + params: + - name: loop-item-param-1 + type: string + - name: my_pipe_param + type: string + tasks: + - name: para-loop-inner-op1 + params: + - name: loop-item-param-1 + value: $(params.loop-item-param-1) + - name: my_pipe_param + value: $(params.my_pipe_param) + taskSpec: + steps: + - name: main + args: + - echo op1 $(inputs.params.loop-item-param-1) $(inputs.params.my_pipe_param) + command: + - sh + - -c + image: library/bash:4.4.23 + params: + - name: loop-item-param-1 + type: string + - name: my_pipe_param + type: string + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + tekton.dev/template: '' + timeout: 0s + - name: para-loop-inner-op2 + params: + - name: loop-item-param-1 + value: $(params.loop-item-param-1) + taskSpec: + steps: + - name: main + args: + - echo op2 $(inputs.params.loop-item-param-1) + command: + - sh + - -c + image: library/bash:4.4.23 + params: + - name: loop-item-param-1 + type: string + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + tekton.dev/template: '' + timeout: 0s + parallelism: 5 + iterateParam: loop-item-param-1 diff --git a/tekton-catalog/pipeline-loops/.gitignore b/tekton-catalog/pipeline-loops/.gitignore new file mode 100644 index 00000000000..16e08bc2f32 --- /dev/null +++ b/tekton-catalog/pipeline-loops/.gitignore @@ -0,0 +1,2 @@ +.ko.yaml +