diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index e0a4bde80ee..8729388290d 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -473,7 +473,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # i.e., rather than a static list, they are either the output of another task or were input # as global pipeline parameters - pipeline_param = sub_group.loop_args + pipeline_param = sub_group.loop_args.items_or_pipeline_param if pipeline_param.op_name is None: withparam_value = '{{workflow.parameters.%s}}' % pipeline_param.name else: @@ -528,19 +528,25 @@ def get_arguments_for_sub_group( else: argument_name = param_name - # default argument_value + special cases - argument_value = '{{inputs.parameters.%s}}' % param_name + # Preparing argument. It can be pipeline input reference, task output reference or loop item (or loop item attribute + sanitized_loop_arg_full_name = '---' if isinstance(sub_group, dsl.ParallelFor): - if sub_group.loop_args.name in param_name: - if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): - subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) - argument_value = '{{item.%s}}' % subvar_name - elif _for_loop.LoopArguments.name_is_loop_arguments(param_name) or sub_group.items_is_pipeline_param: - argument_value = '{{item}}' - else: - raise ValueError("Failed to match loop args with parameter. param_name: {}, ".format(param_name)) - elif dependent_name: - argument_value = '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) + sanitized_loop_arg_full_name = sanitize_k8s_name(self._pipelineparam_full_name(sub_group.loop_args)) + arg_ref_full_name = sanitize_k8s_name(param_name) + # We only care about the reference to the current loop item, not the outer loops + if isinstance(sub_group, dsl.ParallelFor) and arg_ref_full_name.startswith(sanitized_loop_arg_full_name): + if arg_ref_full_name == sanitized_loop_arg_full_name: + argument_value = '{{item}}' + elif _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): + subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) + argument_value = '{{item.%s}}' % subvar_name + else: + raise ValueError("Argument seems to reference the loop item, but not the item itself and not some attribute of the item. param_name: {}, ".format(param_name)) + else: + if dependent_name: + argument_value = '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) + else: + argument_value = '{{inputs.parameters.%s}}' % param_name arguments.append({ 'name': argument_name, diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 7399852b0d8..666c0350589 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -10,6 +10,7 @@ class LoopArguments(dsl.PipelineParam): """Class representing the arguments that are looped over in a ParallelFor loop in the KFP DSL. This doesn't need to be instantiated by the end user, rather it will be automatically created by a ParallelFor ops group.""" + LOOP_ITEM_NAME_BASE = 'loop-item' LOOP_ITEM_PARAM_NAME_BASE = 'loop-item-param' # number of characters in the code which is passed to the constructor NUM_CODE_CHARS = 8 @@ -52,7 +53,7 @@ def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_o if not self._subvar_name_is_legal(subvar_name): raise ValueError("Tried to create subvariable named {} but that's not a legal Python variable " "name.".format(subvar_name)) - setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name)) + setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name, loop_args_op_name=self.op_name)) self.items_or_pipeline_param = items self.referenced_subvar_names = [] @@ -62,7 +63,7 @@ def from_pipeline_param(cls, param: dsl.PipelineParam) -> 'LoopArguments': return LoopArguments( items=param, code=None, - name_override=param.name, + name_override=param.name + '-' + cls.LOOP_ITEM_NAME_BASE, op_name=param.op_name, value=param.value, ) @@ -71,7 +72,7 @@ def __getattr__(self, item): # this is being overridden so that we can access subvariables of the LoopArguments (i.e.: item.a) without # knowing the subvariable names ahead of time self.referenced_subvar_names.append(item) - return LoopArgumentVariable(self.name, item) + return LoopArgumentVariable(self.name, item, loop_args_op_name=self.op_name) def to_list_for_task_yaml(self): if isinstance(self.items_or_pipeline_param, (list, tuple)): @@ -86,20 +87,29 @@ def _make_name(cls, code: Text): return '{}-{}'.format(cls.LOOP_ITEM_PARAM_NAME_BASE, code) @classmethod - def name_is_loop_arguments(cls, param_name: Text) -> bool: + def name_is_withitems_loop_argument(cls, param_name: Text) -> bool: """Return True if the given parameter name looks like it came from a loop arguments parameter.""" return re.match( '%s-[0-9a-f]{%s}' % (cls.LOOP_ITEM_PARAM_NAME_BASE, cls.NUM_CODE_CHARS), param_name, ) is not None + @classmethod + def name_is_withparams_loop_argument(cls, param_name: Text) -> bool: + """Return True if the given parameter name looks like it came from a withParams loop item.""" + return ('-' + cls.LOOP_ITEM_NAME_BASE) in param_name class LoopArgumentVariable(dsl.PipelineParam): """Represents a subvariable for loop arguments. This is used for cases where we're looping over maps, each of which contains several variables.""" SUBVAR_NAME_DELIMITER = '-subvar-' - def __init__(self, loop_args_name: Text, this_variable_name: Text): + def __init__( + self, + loop_args_name: Text, + this_variable_name: Text, + loop_args_op_name: Text, + ): """ If the user ran: with dsl.ParallelFor([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]) as item: @@ -111,7 +121,11 @@ def __init__(self, loop_args_name: Text, this_variable_name: Text): this_variable_name: the name of this subvariable, which is the name of the dict key that spawned this subvariable. """ - super().__init__(name=self.get_name(loop_args_name=loop_args_name, this_variable_name=this_variable_name)) + super().__init__( + name=self.get_name(loop_args_name=loop_args_name, + this_variable_name=this_variable_name), + op_name=loop_args_op_name, + ) @classmethod def get_name(cls, loop_args_name: Text, this_variable_name: Text) -> Text: diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index b87ebe054d5..5f12f4c18de 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -783,6 +783,9 @@ def test_withparam_output_dict(self): def test_withparam_lightweight_out(self): self._test_py_compile_yaml('loop_over_lightweight_output') + def test_parallelfor_item_argument_resolving(self): + self._test_py_compile_yaml('parallelfor_item_argument_resolving') + def test_py_input_artifact_raw_value(self): """Test pipeline input_artifact_raw_value.""" self._test_py_compile_yaml('input_artifact_raw_value') diff --git a/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml b/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml index 4ebc3a2dc26..6139c6503df 100644 --- a/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml +++ b/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml @@ -36,13 +36,13 @@ - |- echo - |- - {{inputs.parameters.produce-list-data_list}} + {{inputs.parameters.produce-list-data_list-loop-item}} "image": |- busybox "inputs": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "metadata": "annotations": "pipelines.kubeflow.org/component_spec": |- @@ -54,9 +54,9 @@ - "arguments": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "value": |- - {{inputs.parameters.produce-list-data_list}} + {{inputs.parameters.produce-list-data_list-loop-item}} "name": |- consume-data "template": |- @@ -64,7 +64,7 @@ "inputs": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "name": |- for-loop-for-loop-00000001-1 - "dag": @@ -72,7 +72,7 @@ - "arguments": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "value": |- {{item}} "dependencies": diff --git a/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py new file mode 100644 index 00000000000..ebcb8c28024 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import NamedTuple + +import kfp +from kfp.components import func_to_container_op + + +# Stabilizing the test output +class StableIDGenerator: + def __init__(self, ): + self._index = 0 + + def get_next_id(self, ): + self._index += 1 + return '{code:0{num_chars:}d}'.format(code=self._index, num_chars=kfp.dsl._for_loop.LoopArguments.NUM_CODE_CHARS) + +kfp.dsl.ParallelFor._get_unique_id_code = StableIDGenerator().get_next_id + + +@func_to_container_op +def produce_str() -> str: + return "Hello" + + +@func_to_container_op +def produce_list_of_dicts() -> list: + return ([{"aaa": "aaa1", "bbb": "bbb1"}, {"aaa": "aaa2", "bbb": "bbb2"}],) + + +@func_to_container_op +def produce_list_of_strings() -> list: + return (["a", "z"],) + + +@func_to_container_op +def produce_list_of_ints() -> list: + return ([1234567890, 987654321],) + + +@func_to_container_op +def consume(param1): + print(param1) + + +@kfp.dsl.pipeline() +def parallelfor_item_argument_resolving(): + produce_str_task = produce_str() + produce_list_of_strings_task = produce_list_of_strings() + produce_list_of_ints_task = produce_list_of_ints() + produce_list_of_dicts_task = produce_list_of_dicts() + + with kfp.dsl.ParallelFor(produce_list_of_strings_task.output) as loop_item: + consume(produce_list_of_strings_task.output) + consume(loop_item) + consume(produce_str_task.output) + + with kfp.dsl.ParallelFor(produce_list_of_ints_task.output) as loop_item: + consume(produce_list_of_ints_task.output) + consume(loop_item) + + with kfp.dsl.ParallelFor(produce_list_of_dicts_task.output) as loop_item: + consume(produce_list_of_dicts_task.output) + #consume(loop_item) # Cannot use the full loop item when it's a dict + consume(loop_item.aaa) + + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(parallelfor_item_argument_resolving, __file__ + '.yaml') + diff --git a/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.yaml b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.yaml new file mode 100644 index 00000000000..8c9ef5ad996 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.yaml @@ -0,0 +1,723 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: "{\"name\": \"Parallelfor item argument resolving pipeline\"}" + generateName: parallelfor-item-argument-resolving- +spec: + arguments: + parameters: [] + entrypoint: parallelfor-item-argument-resolving + serviceAccountName: pipeline-runner + templates: + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-strings-output}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-strings-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-strings-output-loop-item}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-strings-output-loop-item + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-2 + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-str-output}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-str-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-3 + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-ints-output}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-ints-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-4 + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-ints-output-loop-item}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-ints-output-loop-item + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-5 + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-dicts-output}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-dicts-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-6 + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-dicts-output-loop-item-subvar-aaa}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-7 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-strings-output + value: "{{inputs.parameters.produce-list-of-strings-output}}" + name: consume + template: consume + - + arguments: + parameters: + - + name: produce-list-of-strings-output-loop-item + value: "{{inputs.parameters.produce-list-of-strings-output-loop-item}}" + name: consume-2 + template: consume-2 + - + arguments: + parameters: + - + name: produce-str-output + value: "{{inputs.parameters.produce-str-output}}" + name: consume-3 + template: consume-3 + inputs: + parameters: + - + name: produce-list-of-strings-output + - + name: produce-list-of-strings-output-loop-item + - + name: produce-str-output + name: for-loop-for-loop-00000001-1 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-ints-output + value: "{{inputs.parameters.produce-list-of-ints-output}}" + name: consume-4 + template: consume-4 + - + arguments: + parameters: + - + name: produce-list-of-ints-output-loop-item + value: "{{inputs.parameters.produce-list-of-ints-output-loop-item}}" + name: consume-5 + template: consume-5 + inputs: + parameters: + - + name: produce-list-of-ints-output + - + name: produce-list-of-ints-output-loop-item + name: for-loop-for-loop-00000002-2 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-dicts-output + value: "{{inputs.parameters.produce-list-of-dicts-output}}" + name: consume-6 + template: consume-6 + - + arguments: + parameters: + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + value: "{{inputs.parameters.produce-list-of-dicts-output-loop-item-subvar-aaa}}" + name: consume-7 + template: consume-7 + inputs: + parameters: + - + name: produce-list-of-dicts-output + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + name: for-loop-for-loop-00000003-3 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-strings-output + value: "{{tasks.produce-list-of-strings.outputs.parameters.produce-list-of-strings-output}}" + - + name: produce-list-of-strings-output-loop-item + value: "{{item}}" + - + name: produce-str-output + value: "{{tasks.produce-str.outputs.parameters.produce-str-output}}" + dependencies: + - produce-list-of-strings + - produce-str + name: for-loop-for-loop-00000001-1 + template: for-loop-for-loop-00000001-1 + withParam: "{{tasks.produce-list-of-strings.outputs.parameters.produce-list-of-strings-output}}" + - + arguments: + parameters: + - + name: produce-list-of-ints-output + value: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" + - + name: produce-list-of-ints-output-loop-item + value: "{{item}}" + dependencies: + - produce-list-of-ints + name: for-loop-for-loop-00000002-2 + template: for-loop-for-loop-00000002-2 + withParam: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" + - + arguments: + parameters: + - + name: produce-list-of-dicts-output + value: "{{tasks.produce-list-of-dicts.outputs.parameters.produce-list-of-dicts-output}}" + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + value: "{{item.aaa}}" + dependencies: + - produce-list-of-dicts + name: for-loop-for-loop-00000003-3 + template: for-loop-for-loop-00000003-3 + withParam: "{{tasks.produce-list-of-dicts.outputs.parameters.produce-list-of-dicts-output}}" + - + name: produce-list-of-dicts + template: produce-list-of-dicts + - + name: produce-list-of-ints + template: produce-list-of-ints + - + name: produce-list-of-strings + template: produce-list-of-strings + - + name: produce-str + template: produce-str + name: parallelfor-item-argument-resolving + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: + - python3 + - "-u" + - "-c" + - | + def produce_list_of_dicts() -> list: + return ([{"aaa": "aaa1", "bbb": "bbb1"}, {"aaa": "aaa2", "bbb": "bbb2"}],) + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer) + + import argparse + _parser = argparse.ArgumentParser(prog='Produce list of dicts', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_list_of_dicts(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce list of dicts\", \"outputs\": [{\"name\": \"Output\", \"type\": \"JsonArray\"}]}" + name: produce-list-of-dicts + outputs: + artifacts: + - + name: produce-list-of-dicts-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-list-of-dicts-output + valueFrom: + path: /tmp/outputs/Output/data + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: + - python3 + - "-u" + - "-c" + - | + def produce_list_of_ints() -> list: + return ([1234567890, 987654321],) + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer) + + import argparse + _parser = argparse.ArgumentParser(prog='Produce list of ints', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_list_of_ints(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce list of ints\", \"outputs\": [{\"name\": \"Output\", \"type\": \"JsonArray\"}]}" + name: produce-list-of-ints + outputs: + artifacts: + - + name: produce-list-of-ints-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-list-of-ints-output + valueFrom: + path: /tmp/outputs/Output/data + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: + - python3 + - "-u" + - "-c" + - | + def produce_list_of_strings() -> list: + return (["a", "z"],) + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer) + + import argparse + _parser = argparse.ArgumentParser(prog='Produce list of strings', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_list_of_strings(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce list of strings\", \"outputs\": [{\"name\": \"Output\", \"type\": \"JsonArray\"}]}" + name: produce-list-of-strings + outputs: + artifacts: + - + name: produce-list-of-strings-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-list-of-strings-output + valueFrom: + path: /tmp/outputs/Output/data + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: + - python3 + - "-u" + - "-c" + - | + def produce_str() -> str: + return "Hello" + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='Produce str', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_str(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce str\", \"outputs\": [{\"name\": \"Output\", \"type\": \"String\"}]}" + name: produce-str + outputs: + artifacts: + - + name: produce-str-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-str-output + valueFrom: + path: /tmp/outputs/Output/data diff --git a/sdk/python/tests/compiler/testdata/withparam_global.yaml b/sdk/python/tests/compiler/testdata/withparam_global.yaml index d45d9527317..a60c4eaf0db 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_global.yaml @@ -17,24 +17,24 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop - value: '{{inputs.parameters.loopidy_doop}}' + - name: loopidy_doop-loop-item + value: '{{inputs.parameters.loopidy_doop-loop-item}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: loopidy_doop + - name: loopidy_doop-loop-item name: for-loop-for-loop-00000001-1 - container: args: - - 'echo no output global op1, item: {{inputs.parameters.loopidy_doop}}' + - 'echo no output global op1, item: {{inputs.parameters.loopidy_doop-loop-item}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: loopidy_doop + - name: loopidy_doop-loop-item name: my-in-cop1 - container: args: @@ -68,7 +68,7 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop + - name: loopidy_doop-loop-item value: '{{item}}' dependencies: - my-out-cop0 diff --git a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml index 53da6e7f66d..f78f3c69b5b 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml @@ -17,24 +17,24 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop-subvar-a - value: '{{inputs.parameters.loopidy_doop-subvar-a}}' + - name: loopidy_doop-loop-item-subvar-a + value: '{{inputs.parameters.loopidy_doop-loop-item-subvar-a}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: loopidy_doop-subvar-a + - name: loopidy_doop-loop-item-subvar-a name: for-loop-for-loop-00000001-1 - container: args: - - 'echo no output global op1, item.a: {{inputs.parameters.loopidy_doop-subvar-a}}' + - 'echo no output global op1, item.a: {{inputs.parameters.loopidy_doop-loop-item-subvar-a}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: loopidy_doop-subvar-a + - name: loopidy_doop-loop-item-subvar-a name: my-in-cop1 - container: args: @@ -68,7 +68,7 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop-subvar-a + - name: loopidy_doop-loop-item-subvar-a value: '{{item.a}}' dependencies: - my-out-cop0 diff --git a/sdk/python/tests/compiler/testdata/withparam_output.yaml b/sdk/python/tests/compiler/testdata/withparam_output.yaml index a3b24ac155b..8978d1d055b 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_output.yaml @@ -14,24 +14,24 @@ spec: tasks: - arguments: parameters: - - name: my-out-cop0-out - value: '{{inputs.parameters.my-out-cop0-out}}' + - name: my-out-cop0-out-loop-item + value: '{{inputs.parameters.my-out-cop0-out-loop-item}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: my-out-cop0-out + - name: my-out-cop0-out-loop-item name: for-loop-for-loop-00000001-1 - container: args: - - 'echo do output op1 item: {{inputs.parameters.my-out-cop0-out}}' + - 'echo do output op1 item: {{inputs.parameters.my-out-cop0-out-loop-item}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: my-out-cop0-out + - name: my-out-cop0-out-loop-item name: my-in-cop1 - container: args: @@ -65,7 +65,7 @@ spec: tasks: - arguments: parameters: - - name: my-out-cop0-out + - name: my-out-cop0-out-loop-item value: '{{item}}' dependencies: - my-out-cop0 diff --git a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml index 44772a46a61..89aa8bb4b2a 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml @@ -14,24 +14,24 @@ spec: tasks: - arguments: parameters: - - name: out-subvar-a - value: '{{inputs.parameters.out-subvar-a}}' + - name: my-out-cop0-out-loop-item-subvar-a + value: '{{inputs.parameters.my-out-cop0-out-loop-item-subvar-a}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: out-subvar-a + - name: my-out-cop0-out-loop-item-subvar-a name: for-loop-for-loop-00000001-1 - container: args: - - 'echo do output op1 item.a: {{inputs.parameters.out-subvar-a}}' + - 'echo do output op1 item.a: {{inputs.parameters.my-out-cop0-out-loop-item-subvar-a}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: out-subvar-a + - name: my-out-cop0-out-loop-item-subvar-a name: my-in-cop1 - container: args: @@ -65,7 +65,7 @@ spec: tasks: - arguments: parameters: - - name: out-subvar-a + - name: my-out-cop0-out-loop-item-subvar-a value: '{{item.a}}' dependencies: - my-out-cop0