Skip to content

Commit

Permalink
Support parallelism feature on DSL side (kubeflow#592)
Browse files Browse the repository at this point in the history
  • Loading branch information
pugangxa authored May 20, 2021
1 parent 7a6423e commit 95557c6
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 0 deletions.
4 changes: 4 additions & 0 deletions sdk/python/kfp_tekton/compiler/_tekton_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task

# add loop special filed
custom_task_cr['kind'] = 'PipelineLoop'
if custom_task[custom_task_key]['spec'].get('parallelism') is not None:
custom_task_cr['spec']['parallelism'] = custom_task[custom_task_key]['spec']['parallelism']
# remove from pipeline run spec
del custom_task[custom_task_key]['spec']['parallelism']
custom_task_cr['spec']['iterateParam'] = custom_task[custom_task_key]['loop_args']
for custom_task_param in custom_task[custom_task_key]['spec']['params']:
if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']:
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies, pipeline_
self.loops_pipeline[group_name]['spec']['params'].append({
'name': param[0], 'value': '$(params.%s)' % param[0]
})
if sub_group.parallelism is not None:
self.loops_pipeline[group_name]['spec']['parallelism'] = sub_group.parallelism

return template

Expand Down
9 changes: 9 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ def test_sidecar_workflow(self):
from .testdata.sidecar import sidecar_pipeline
self._test_pipeline_workflow(sidecar_pipeline, 'sidecar.yaml')

def test_loop_parallelism_workflow(self):
"""
Test compiling a loop with parallelism defined workflow.
"""
from .testdata.loop_static_with_parallelism import pipeline
self._test_pipeline_workflow(
pipeline,
'loop_static_with_parallelism.yaml')

def test_loop_static_workflow(self):
"""
Test compiling a loop static params in workflow.
Expand Down
55 changes: 55 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_static_with_parallelism.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.dsl as dsl
from kfp_tekton.compiler import TektonCompiler


class Coder:
def empty(self):
return ""


TektonCompiler._get_unique_id_code = Coder.empty


@dsl.pipeline(name='para-loop-pipeline')
def pipeline(my_pipe_param='10'):
loop_args = [1, 2, 3]
with dsl.ParallelFor(loop_args=loop_args, parallelism=5) as item:
op1 = dsl.ContainerOp(
name="para-loop-inner-op1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item, my_pipe_param)],
)

op2 = dsl.ContainerOp(
name="para-loop-inner-op2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item],
)

op_out = dsl.ContainerOp(
name="para-loop-out-op",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo %s" % my_pipe_param],
)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param",
"optional": true}], "name": "para-loop-pipeline"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"para-loop-inner-op1": [], "para-loop-inner-op2":
[], "para-loop-out-op": []}'
tekton.dev/input_artifacts: '{}'
tekton.dev/output_artifacts: '{}'
name: para-loop-pipeline
spec:
params:
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- default: '10'
name: my_pipe_param
tasks:
- name: para-loop-out-op
params:
- name: my_pipe_param
value: $(params.my_pipe_param)
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
params:
- name: my_pipe_param
steps:
- args:
- echo $(inputs.params.my_pipe_param)
command:
- sh
- -c
image: library/bash:4.4.23
name: main
timeout: 0s
- name: para-loop-pipeline-for-loop-2
params:
- name: loop-item-param-1
value: '[1, 2, 3]'
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: para-loop-pipeline-for-loop-2
timeout: 0s
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
metadata:
name: para-loop-pipeline-for-loop-2
spec:
pipelineSpec:
params:
- name: loop-item-param-1
type: string
- name: my_pipe_param
type: string
tasks:
- name: para-loop-inner-op1
params:
- name: loop-item-param-1
value: $(params.loop-item-param-1)
- name: my_pipe_param
value: $(params.my_pipe_param)
taskSpec:
steps:
- name: main
args:
- echo op1 $(inputs.params.loop-item-param-1) $(inputs.params.my_pipe_param)
command:
- sh
- -c
image: library/bash:4.4.23
params:
- name: loop-item-param-1
type: string
- name: my_pipe_param
type: string
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
tekton.dev/template: ''
timeout: 0s
- name: para-loop-inner-op2
params:
- name: loop-item-param-1
value: $(params.loop-item-param-1)
taskSpec:
steps:
- name: main
args:
- echo op2 $(inputs.params.loop-item-param-1)
command:
- sh
- -c
image: library/bash:4.4.23
params:
- name: loop-item-param-1
type: string
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
tekton.dev/template: ''
timeout: 0s
parallelism: 5
iterateParam: loop-item-param-1
2 changes: 2 additions & 0 deletions tekton-catalog/pipeline-loops/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.ko.yaml

0 comments on commit 95557c6

Please sign in to comment.