Skip to content

Commit

Permalink
feat(backend): Support loop pipelineruns of defining dictionaries par…
Browse files Browse the repository at this point in the history
…ams (kubeflow#460)
  • Loading branch information
fenglixa authored Feb 8, 2021
1 parent ac8b697 commit 99e506c
Show file tree
Hide file tree
Showing 21 changed files with 1,433 additions and 23 deletions.
1 change: 1 addition & 0 deletions sdk/FEATURES.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ To see how the Python SDK provides this feature, refer to the examples below:
- [loop_static](/sdk/python/tests/compiler/testdata/loop_static.py)
- [withparam_global](/sdk/python/tests/compiler/testdata/withparam_global.py)
- [withitem_nested](/sdk/python/tests/compiler/testdata/withitem_nested.py)
- [parallelfor_item_argument_resolving](/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py)

### Any Sequencer

Expand Down
16 changes: 16 additions & 0 deletions sdk/python/kfp_tekton/compiler/_tekton_hander.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,29 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict):
}
# handle loop special case
if custom_task[custom_task_key]['kind'] == 'loops':
# if subvar exist, this is dict loop parameters
# remove the loop_arg and add subvar args to the cr params
if custom_task[custom_task_key]['loop_sub_args'] != []:
refesh_cr_params = []
for param in custom_task_cr['spec']['pipelineSpec']['params']:
if param['name'] != custom_task[custom_task_key]['loop_args']:
refesh_cr_params.append(param)
custom_task_cr['spec']['pipelineSpec']['params'] = refesh_cr_params
custom_task_cr['spec']['pipelineSpec']['params'].extend([{
"name": sub_param,
'type': 'string'
} for sub_param in custom_task[custom_task_key]['loop_sub_args']])

# add loop special filed
custom_task_cr['kind'] = 'PipelineLoop'
custom_task_cr['spec']['iterateParam'] = custom_task[custom_task_key]['loop_args']
for custom_task_param in custom_task[custom_task_key]['spec']['params']:
if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']:
custom_task_cr = json.loads(
json.dumps(custom_task_cr).replace(custom_task_param['value'], '$(params.%s)' % custom_task_param['name']))
custom_task_crs.append(custom_task_cr)
custom_task[custom_task_key]['spec']['params'] = sorted(custom_task[custom_task_key]['spec']['params'],
key=lambda k: k['name'])
tasks.append(custom_task[custom_task_key]['spec'])

# handle the nested custom task case
Expand Down
14 changes: 12 additions & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,19 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
self.loops_pipeline[group_name] = {
'kind': 'loops',
'loop_args': sub_group.loop_args.full_name,
'loop_sub_args': [],
'task_list': [],
'spec': {},
'depends': []
}
for subvarName in sub_group.loop_args.referenced_subvar_names:
if subvarName != '__iter__':
self.loops_pipeline[group_name]['loop_sub_args'].append(sub_group.loop_args.full_name + '-subvar-' + subvarName)
# get the dependencies tasks rely on the loop task.
for depend in dependencies.keys():
if depend == sub_group.name:
self.loops_pipeline[group_name]['spec']['runAfter'] = [task for task in dependencies[depend]]
self.loops_pipeline[group_name]['spec']['runAfter'].sort()
if sub_group.name in dependencies[depend]:
self.loops_pipeline[group_name]['depends'].append({'org': depend, 'runAfter': group_name})
for op in sub_group.groups + sub_group.ops:
Expand Down Expand Up @@ -223,14 +228,16 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
for input in inputs.keys():
if input == sub_group.name:
for param in inputs[input]:
if param[0] != sub_group.loop_args.full_name and param[1]:
if param[0] != sub_group.loop_args.full_name and param[1] and param[0] not in self.loops_pipeline[group_name][
'loop_sub_args']:
replace_str = param[1] + '-'
self.loops_pipeline[group_name]['spec']['params'].append({
'name': param[0], 'value': '$(tasks.%s.results.%s)' % (
param[1], sanitize_k8s_name(param[0].replace(replace_str, ''))
)
})
if param[0] != sub_group.loop_args.full_name and not param[1]:
if param[0] != sub_group.loop_args.full_name and not param[1] and param[0] not in self.loops_pipeline[group_name][
'loop_sub_args']:
self.loops_pipeline[group_name]['spec']['params'].append({
'name': param[0], 'value': '$(params.%s)' % param[0]
})
Expand Down Expand Up @@ -454,6 +461,9 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
# process input parameters from upstream tasks
pipeline_param_names = [p['name'] for p in params]
loop_args = [self.loops_pipeline[key]['loop_args'] for key in self.loops_pipeline.keys()]
for key in self.loops_pipeline.keys():
if self.loops_pipeline[key]['loop_sub_args'] != []:
loop_args.extend(self.loops_pipeline[key]['loop_sub_args'])
for task in task_refs:
op = pipeline.ops.get(task['name'])
for tp in task.get('params', []):
Expand Down
21 changes: 21 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,27 @@ def test_withparam_global_workflow(self):
from .testdata.withparam_global import pipeline
self._test_pipeline_workflow(pipeline, 'withparam_global.yaml')

def test_withparam_global_dict_workflow(self):
"""
Test compiling a withparam global dict in workflow.
"""
from .testdata.withparam_global_dict import pipeline
self._test_pipeline_workflow(pipeline, 'withparam_global_dict.yaml')

def test_withparam_output_dict_workflow(self):
"""
Test compiling a withparam output dict in workflow.
"""
from .testdata.withparam_output_dict import pipeline
self._test_pipeline_workflow(pipeline, 'withparam_output_dict.yaml')

def test_parallelfor_item_argument_resolving_workflow(self):
"""
Test compiling a parallelfor item argument resolving in workflow.
"""
from .testdata.parallelfor_item_argument_resolving import parallelfor_item_argument_resolving
self._test_pipeline_workflow(parallelfor_item_argument_resolving, 'parallelfor_item_argument_resolving.yaml')

def test_loop_over_lightweight_output_workflow(self):
"""
Test compiling a loop over lightweight output in workflow.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
from kfp.components import func_to_container_op


# Stabilizing the test output
class StableIDGenerator:
def __init__(self, ):
self._index = 0

def get_next_id(self, ):
self._index += 1
return '{code:0{num_chars:}d}'.format(code=self._index, num_chars=kfp.dsl._for_loop.LoopArguments.NUM_CODE_CHARS)


kfp.dsl.ParallelFor._get_unique_id_code = StableIDGenerator().get_next_id


@func_to_container_op
def produce_str() -> str:
return "Hello"


@func_to_container_op
def produce_list_of_dicts() -> list:
return ({"aaa": "aaa1", "bbb": "bbb1"}, {"aaa": "aaa2", "bbb": "bbb2"})


@func_to_container_op
def produce_list_of_strings() -> list:
return ("a", "z")


@func_to_container_op
def produce_list_of_ints() -> list:
return (1234567890, 987654321)


@func_to_container_op
def consume(param1):
print(param1)


@kfp.dsl.pipeline()
def parallelfor_item_argument_resolving():
produce_str_task = produce_str()
produce_list_of_strings_task = produce_list_of_strings()
produce_list_of_ints_task = produce_list_of_ints()
produce_list_of_dicts_task = produce_list_of_dicts()

with kfp.dsl.ParallelFor(produce_list_of_strings_task.output) as loop_item:
consume(produce_list_of_strings_task.output)
consume(loop_item)
consume(produce_str_task.output)

with kfp.dsl.ParallelFor(produce_list_of_ints_task.output) as loop_item:
consume(produce_list_of_ints_task.output)
consume(loop_item)

with kfp.dsl.ParallelFor(produce_list_of_dicts_task.output) as loop_item:
consume(produce_list_of_dicts_task.output)
# consume(loop_item) # Cannot use the full loop item when it's a dict
consume(loop_item.aaa)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(parallelfor_item_argument_resolving, __file__.replace('.py', '.yaml'))
Loading

0 comments on commit 99e506c

Please sign in to comment.