From 8bd6620bb63e29cdc5beac34ab3ab2b03511b64d Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 7 Feb 2020 20:04:29 -0800 Subject: [PATCH 1/4] SDK - Compiler - Fixed ParallelFor name clashes The ParallelFor argument reference resolving was really broken. The logic "worked" like this - of the name of the referenced output contained the name of the loop collection source output, then it was considered to be the reference to the loop item. This broke lots of scenarios especially in cases where there were multiple components with same output name (e.g. the default "Output" output name). The logic also did not distinguish between references to the loop collection item vs. references to the loop collection source itself. I've rewritten the argument resolving logic, to fix the issues. --- sdk/python/kfp/compiler/compiler.py | 32 +- sdk/python/kfp/dsl/_for_loop.py | 26 +- sdk/python/tests/compiler/compiler_tests.py | 3 + .../loop_over_lightweight_output.yaml | 12 +- .../testdata/parallelfor_name_clashes.py | 47 +++ .../testdata/parallelfor_name_clashes.yaml | 389 ++++++++++++++++++ .../compiler/testdata/withparam_global.yaml | 12 +- .../testdata/withparam_global_dict.yaml | 12 +- .../compiler/testdata/withparam_output.yaml | 12 +- .../testdata/withparam_output_dict.yaml | 12 +- 10 files changed, 508 insertions(+), 49 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py create mode 100644 sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index e0a4bde80ee..8729388290d 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -473,7 +473,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies): # i.e., rather than a static list, they are either the output of another task or were input # as global pipeline parameters - pipeline_param = sub_group.loop_args + pipeline_param = sub_group.loop_args.items_or_pipeline_param if pipeline_param.op_name is None: withparam_value = '{{workflow.parameters.%s}}' % pipeline_param.name else: @@ -528,19 +528,25 @@ def get_arguments_for_sub_group( else: argument_name = param_name - # default argument_value + special cases - argument_value = '{{inputs.parameters.%s}}' % param_name + # Preparing argument. It can be pipeline input reference, task output reference or loop item (or loop item attribute + sanitized_loop_arg_full_name = '---' if isinstance(sub_group, dsl.ParallelFor): - if sub_group.loop_args.name in param_name: - if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): - subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) - argument_value = '{{item.%s}}' % subvar_name - elif _for_loop.LoopArguments.name_is_loop_arguments(param_name) or sub_group.items_is_pipeline_param: - argument_value = '{{item}}' - else: - raise ValueError("Failed to match loop args with parameter. param_name: {}, ".format(param_name)) - elif dependent_name: - argument_value = '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) + sanitized_loop_arg_full_name = sanitize_k8s_name(self._pipelineparam_full_name(sub_group.loop_args)) + arg_ref_full_name = sanitize_k8s_name(param_name) + # We only care about the reference to the current loop item, not the outer loops + if isinstance(sub_group, dsl.ParallelFor) and arg_ref_full_name.startswith(sanitized_loop_arg_full_name): + if arg_ref_full_name == sanitized_loop_arg_full_name: + argument_value = '{{item}}' + elif _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name): + subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name) + argument_value = '{{item.%s}}' % subvar_name + else: + raise ValueError("Argument seems to reference the loop item, but not the item itself and not some attribute of the item. param_name: {}, ".format(param_name)) + else: + if dependent_name: + argument_value = '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) + else: + argument_value = '{{inputs.parameters.%s}}' % param_name arguments.append({ 'name': argument_name, diff --git a/sdk/python/kfp/dsl/_for_loop.py b/sdk/python/kfp/dsl/_for_loop.py index 7399852b0d8..666c0350589 100644 --- a/sdk/python/kfp/dsl/_for_loop.py +++ b/sdk/python/kfp/dsl/_for_loop.py @@ -10,6 +10,7 @@ class LoopArguments(dsl.PipelineParam): """Class representing the arguments that are looped over in a ParallelFor loop in the KFP DSL. This doesn't need to be instantiated by the end user, rather it will be automatically created by a ParallelFor ops group.""" + LOOP_ITEM_NAME_BASE = 'loop-item' LOOP_ITEM_PARAM_NAME_BASE = 'loop-item-param' # number of characters in the code which is passed to the constructor NUM_CODE_CHARS = 8 @@ -52,7 +53,7 @@ def __init__(self, items: Union[ItemList, dsl.PipelineParam], code: Text, name_o if not self._subvar_name_is_legal(subvar_name): raise ValueError("Tried to create subvariable named {} but that's not a legal Python variable " "name.".format(subvar_name)) - setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name)) + setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name, loop_args_op_name=self.op_name)) self.items_or_pipeline_param = items self.referenced_subvar_names = [] @@ -62,7 +63,7 @@ def from_pipeline_param(cls, param: dsl.PipelineParam) -> 'LoopArguments': return LoopArguments( items=param, code=None, - name_override=param.name, + name_override=param.name + '-' + cls.LOOP_ITEM_NAME_BASE, op_name=param.op_name, value=param.value, ) @@ -71,7 +72,7 @@ def __getattr__(self, item): # this is being overridden so that we can access subvariables of the LoopArguments (i.e.: item.a) without # knowing the subvariable names ahead of time self.referenced_subvar_names.append(item) - return LoopArgumentVariable(self.name, item) + return LoopArgumentVariable(self.name, item, loop_args_op_name=self.op_name) def to_list_for_task_yaml(self): if isinstance(self.items_or_pipeline_param, (list, tuple)): @@ -86,20 +87,29 @@ def _make_name(cls, code: Text): return '{}-{}'.format(cls.LOOP_ITEM_PARAM_NAME_BASE, code) @classmethod - def name_is_loop_arguments(cls, param_name: Text) -> bool: + def name_is_withitems_loop_argument(cls, param_name: Text) -> bool: """Return True if the given parameter name looks like it came from a loop arguments parameter.""" return re.match( '%s-[0-9a-f]{%s}' % (cls.LOOP_ITEM_PARAM_NAME_BASE, cls.NUM_CODE_CHARS), param_name, ) is not None + @classmethod + def name_is_withparams_loop_argument(cls, param_name: Text) -> bool: + """Return True if the given parameter name looks like it came from a withParams loop item.""" + return ('-' + cls.LOOP_ITEM_NAME_BASE) in param_name class LoopArgumentVariable(dsl.PipelineParam): """Represents a subvariable for loop arguments. This is used for cases where we're looping over maps, each of which contains several variables.""" SUBVAR_NAME_DELIMITER = '-subvar-' - def __init__(self, loop_args_name: Text, this_variable_name: Text): + def __init__( + self, + loop_args_name: Text, + this_variable_name: Text, + loop_args_op_name: Text, + ): """ If the user ran: with dsl.ParallelFor([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]) as item: @@ -111,7 +121,11 @@ def __init__(self, loop_args_name: Text, this_variable_name: Text): this_variable_name: the name of this subvariable, which is the name of the dict key that spawned this subvariable. """ - super().__init__(name=self.get_name(loop_args_name=loop_args_name, this_variable_name=this_variable_name)) + super().__init__( + name=self.get_name(loop_args_name=loop_args_name, + this_variable_name=this_variable_name), + op_name=loop_args_op_name, + ) @classmethod def get_name(cls, loop_args_name: Text, this_variable_name: Text) -> Text: diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index b87ebe054d5..81549370a64 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -783,6 +783,9 @@ def test_withparam_output_dict(self): def test_withparam_lightweight_out(self): self._test_py_compile_yaml('loop_over_lightweight_output') + def test_parallelfor_name_clashes(self): + self._test_py_compile_yaml('parallelfor_name_clashes') + def test_py_input_artifact_raw_value(self): """Test pipeline input_artifact_raw_value.""" self._test_py_compile_yaml('input_artifact_raw_value') diff --git a/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml b/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml index 4ebc3a2dc26..6139c6503df 100644 --- a/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml +++ b/sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml @@ -36,13 +36,13 @@ - |- echo - |- - {{inputs.parameters.produce-list-data_list}} + {{inputs.parameters.produce-list-data_list-loop-item}} "image": |- busybox "inputs": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "metadata": "annotations": "pipelines.kubeflow.org/component_spec": |- @@ -54,9 +54,9 @@ - "arguments": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "value": |- - {{inputs.parameters.produce-list-data_list}} + {{inputs.parameters.produce-list-data_list-loop-item}} "name": |- consume-data "template": |- @@ -64,7 +64,7 @@ "inputs": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "name": |- for-loop-for-loop-00000001-1 - "dag": @@ -72,7 +72,7 @@ - "arguments": "parameters": - "name": |- - produce-list-data_list + produce-list-data_list-loop-item "value": |- {{item}} "dependencies": diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py new file mode 100644 index 00000000000..f66effed5d2 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import NamedTuple + +import kfp +from kfp.components import func_to_container_op + +@func_to_container_op +def produce_str() -> str: + return "Hello" + +@func_to_container_op +def produce_list() -> list: + return ["1", "2"] + +@func_to_container_op +def consume(param1): + print(param1) + +@kfp.dsl.pipeline() +def parallelfor_name_clashes_pipeline(): + produce_str_task = produce_str() + produce_list_task = produce_list() + with kfp.dsl.ParallelFor(produce_list_task.output) as loop_item: + consume(produce_list_task.output) + consume(produce_str_task.output) + consume(loop_item) + consume(loop_item.aaa) + + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(parallelfor_name_clashes_pipeline, __file__ + '.yaml') + diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml new file mode 100644 index 00000000000..e4ee32533f7 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml @@ -0,0 +1,389 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"name": "My pipeline"}' + generateName: my-pipeline- +spec: + arguments: + parameters: [] + entrypoint: my-pipeline + serviceAccountName: pipeline-runner + templates: + - + name: consume + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' + inputs: + parameters: + - + name: produce-list-output + container: + image: "tensorflow/tensorflow:1.13.2-py3" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx]))] + args: + - "--param1" + - "{{inputs.parameters.produce-list-output}}" + - + name: consume-2 + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' + inputs: + parameters: + - + name: produce-str-output + container: + image: "tensorflow/tensorflow:1.13.2-py3" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - "--param1" + - "{{inputs.parameters.produce-str-output}}" + - + name: consume-3 + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' + inputs: + parameters: + - + name: produce-list-output-loop-item + container: + image: "tensorflow/tensorflow:1.13.2-py3" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - "--param1" + - "{{inputs.parameters.produce-list-output-loop-item}}" + - + name: consume-4 + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' + inputs: + parameters: + - + name: produce-list-output-loop-item-subvar-aaa + container: + image: "tensorflow/tensorflow:1.13.2-py3" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - "--param1" + - "{{inputs.parameters.produce-list-output-loop-item-subvar-aaa}}" + - + name: for-loop-for-loop-95e50821-1 + inputs: + parameters: + - + name: produce-list-output + - + name: produce-list-output-loop-item + - + name: produce-list-output-loop-item-subvar-aaa + - + name: produce-str-output + dag: + tasks: + - + name: consume + template: consume + arguments: + parameters: + - + name: produce-list-output + value: "{{inputs.parameters.produce-list-output}}" + - + name: consume-2 + template: consume-2 + arguments: + parameters: + - + name: produce-str-output + value: "{{inputs.parameters.produce-str-output}}" + - + name: consume-3 + template: consume-3 + arguments: + parameters: + - + name: produce-list-output-loop-item + value: "{{inputs.parameters.produce-list-output-loop-item}}" + - + name: consume-4 + template: consume-4 + arguments: + parameters: + - + name: produce-list-output-loop-item-subvar-aaa + value: "{{inputs.parameters.produce-list-output-loop-item-subvar-aaa}}" + - + name: my-pipeline + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-output + value: "{{tasks.produce-list.outputs.parameters.produce-list-output}}" + - + name: produce-list-output-loop-item + value: "{{item}}" + - + name: produce-list-output-loop-item-subvar-aaa + value: "{{item.aaa}}" + - + name: produce-str-output + value: "{{tasks.produce-str.outputs.parameters.produce-str-output}}" + dependencies: + - produce-list + - produce-str + name: for-loop-for-loop-95e50821-1 + template: for-loop-for-loop-95e50821-1 + withParam: "{{tasks.produce-list.outputs.parameters.produce-list-output}}" + - + name: produce-list + template: produce-list + - + name: produce-str + template: produce-str + - + name: produce-list + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"name": "Produce list", "outputs": [{"name": "Output", "type": "JsonArray"}]}' + outputs: + artifacts: + - + name: produce-list-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-list-output + valueFrom: + path: /tmp/outputs/Output/data + container: + image: "tensorflow/tensorflow:1.13.2-py3" + command: + - python3 + - "-u" + - "-c" + - | + def produce_list() -> list: + return ["1", "2"] + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer) + + import argparse + _parser = argparse.ArgumentParser(prog='Produce list', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_list(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - "----output-paths" + - /tmp/outputs/Output/data + - + name: produce-str + metadata: + annotations: + pipelines.kubeflow.org/component_spec: '{"name": "Produce str", "outputs": [{"name": "Output", "type": "String"}]}' + outputs: + artifacts: + - + name: produce-str-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-str-output + valueFrom: + path: /tmp/outputs/Output/data + container: + image: "tensorflow/tensorflow:1.13.2-py3" + command: + - python3 + - "-u" + - "-c" + - | + def produce_str() -> str: + return "Hello" + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='Produce str', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_str(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - "----output-paths" + - /tmp/outputs/Output/data diff --git a/sdk/python/tests/compiler/testdata/withparam_global.yaml b/sdk/python/tests/compiler/testdata/withparam_global.yaml index d45d9527317..a60c4eaf0db 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_global.yaml @@ -17,24 +17,24 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop - value: '{{inputs.parameters.loopidy_doop}}' + - name: loopidy_doop-loop-item + value: '{{inputs.parameters.loopidy_doop-loop-item}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: loopidy_doop + - name: loopidy_doop-loop-item name: for-loop-for-loop-00000001-1 - container: args: - - 'echo no output global op1, item: {{inputs.parameters.loopidy_doop}}' + - 'echo no output global op1, item: {{inputs.parameters.loopidy_doop-loop-item}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: loopidy_doop + - name: loopidy_doop-loop-item name: my-in-cop1 - container: args: @@ -68,7 +68,7 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop + - name: loopidy_doop-loop-item value: '{{item}}' dependencies: - my-out-cop0 diff --git a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml index 53da6e7f66d..f78f3c69b5b 100644 --- a/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_global_dict.yaml @@ -17,24 +17,24 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop-subvar-a - value: '{{inputs.parameters.loopidy_doop-subvar-a}}' + - name: loopidy_doop-loop-item-subvar-a + value: '{{inputs.parameters.loopidy_doop-loop-item-subvar-a}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: loopidy_doop-subvar-a + - name: loopidy_doop-loop-item-subvar-a name: for-loop-for-loop-00000001-1 - container: args: - - 'echo no output global op1, item.a: {{inputs.parameters.loopidy_doop-subvar-a}}' + - 'echo no output global op1, item.a: {{inputs.parameters.loopidy_doop-loop-item-subvar-a}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: loopidy_doop-subvar-a + - name: loopidy_doop-loop-item-subvar-a name: my-in-cop1 - container: args: @@ -68,7 +68,7 @@ spec: tasks: - arguments: parameters: - - name: loopidy_doop-subvar-a + - name: loopidy_doop-loop-item-subvar-a value: '{{item.a}}' dependencies: - my-out-cop0 diff --git a/sdk/python/tests/compiler/testdata/withparam_output.yaml b/sdk/python/tests/compiler/testdata/withparam_output.yaml index a3b24ac155b..8978d1d055b 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_output.yaml @@ -14,24 +14,24 @@ spec: tasks: - arguments: parameters: - - name: my-out-cop0-out - value: '{{inputs.parameters.my-out-cop0-out}}' + - name: my-out-cop0-out-loop-item + value: '{{inputs.parameters.my-out-cop0-out-loop-item}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: my-out-cop0-out + - name: my-out-cop0-out-loop-item name: for-loop-for-loop-00000001-1 - container: args: - - 'echo do output op1 item: {{inputs.parameters.my-out-cop0-out}}' + - 'echo do output op1 item: {{inputs.parameters.my-out-cop0-out-loop-item}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: my-out-cop0-out + - name: my-out-cop0-out-loop-item name: my-in-cop1 - container: args: @@ -65,7 +65,7 @@ spec: tasks: - arguments: parameters: - - name: my-out-cop0-out + - name: my-out-cop0-out-loop-item value: '{{item}}' dependencies: - my-out-cop0 diff --git a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml index 44772a46a61..89aa8bb4b2a 100644 --- a/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml +++ b/sdk/python/tests/compiler/testdata/withparam_output_dict.yaml @@ -14,24 +14,24 @@ spec: tasks: - arguments: parameters: - - name: out-subvar-a - value: '{{inputs.parameters.out-subvar-a}}' + - name: my-out-cop0-out-loop-item-subvar-a + value: '{{inputs.parameters.my-out-cop0-out-loop-item-subvar-a}}' name: my-in-cop1 template: my-in-cop1 inputs: parameters: - - name: out-subvar-a + - name: my-out-cop0-out-loop-item-subvar-a name: for-loop-for-loop-00000001-1 - container: args: - - 'echo do output op1 item.a: {{inputs.parameters.out-subvar-a}}' + - 'echo do output op1 item.a: {{inputs.parameters.my-out-cop0-out-loop-item-subvar-a}}' command: - sh - -c image: library/bash:4.4.23 inputs: parameters: - - name: out-subvar-a + - name: my-out-cop0-out-loop-item-subvar-a name: my-in-cop1 - container: args: @@ -65,7 +65,7 @@ spec: tasks: - arguments: parameters: - - name: out-subvar-a + - name: my-out-cop0-out-loop-item-subvar-a value: '{{item.a}}' dependencies: - my-out-cop0 From 80b9a785b6ebb3bd6ac9f37ba45572da5ddeee4f Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Mon, 10 Feb 2020 13:25:44 -0800 Subject: [PATCH 2/4] Argo cannot use {{item}} when withParams items are dicts --- .../testdata/parallelfor_name_clashes.py | 34 +- .../testdata/parallelfor_name_clashes.yaml | 664 +++++++++++++----- 2 files changed, 528 insertions(+), 170 deletions(-) diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py index f66effed5d2..418c079f1c2 100644 --- a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py +++ b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py @@ -22,22 +22,46 @@ def produce_str() -> str: return "Hello" + @func_to_container_op -def produce_list() -> list: - return ["1", "2"] +def produce_list_of_dicts() -> list: + return ([{"aaa": "aaa1", "bbb": "bbb1"}, {"aaa": "aaa2", "bbb": "bbb2"}],) + + +@func_to_container_op +def produce_list_of_strings() -> list: + return (["a", "z"],) + + +@func_to_container_op +def produce_list_of_ints() -> list: + return ([1234567890, 987654321],) + @func_to_container_op def consume(param1): print(param1) + @kfp.dsl.pipeline() def parallelfor_name_clashes_pipeline(): produce_str_task = produce_str() - produce_list_task = produce_list() - with kfp.dsl.ParallelFor(produce_list_task.output) as loop_item: - consume(produce_list_task.output) + produce_list_of_strings_task = produce_list_of_strings() + produce_list_of_ints_task = produce_list_of_ints() + produce_list_of_dicts_task = produce_list_of_dicts() + + with kfp.dsl.ParallelFor(produce_list_of_strings_task.output) as loop_item: + consume(produce_list_of_strings_task.output) + consume(loop_item) consume(produce_str_task.output) + + with kfp.dsl.ParallelFor(produce_list_of_ints_task.output) as loop_item: + consume(produce_list_of_ints_task.output) consume(loop_item) + + with kfp.dsl.ParallelFor(produce_list_of_dicts_task.output) as loop_item: + consume(produce_list_of_dicts_task.output) + #consume(loop_item) # Cannot use the full loop item when it's a dict consume(loop_item.aaa) diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml index e4ee32533f7..87cfb640e96 100644 --- a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml +++ b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml @@ -1,27 +1,21 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow -metadata: - annotations: - pipelines.kubeflow.org/pipeline_spec: '{"name": "My pipeline"}' - generateName: my-pipeline- -spec: - arguments: +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: "{\"name\": \"Parallelfor name clashes pipeline\"}" + generateName: parallelfor-name-clashes-pipeline- +spec: + arguments: parameters: [] - entrypoint: my-pipeline + entrypoint: parallelfor-name-clashes-pipeline serviceAccountName: pipeline-runner - templates: - - - name: consume - metadata: - annotations: - pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' - inputs: - parameters: - - - name: produce-list-output - container: - image: "tensorflow/tensorflow:1.13.2-py3" - command: + templates: + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-strings-output}}" + command: - python3 - "-u" - "-c" @@ -51,22 +45,22 @@ spec: except OSError: pass with open(output_file, 'w') as f: - f.write(_output_serializers[idx](_outputs[idx]))] - args: - - "--param1" - - "{{inputs.parameters.produce-list-output}}" - - - name: consume-2 - metadata: - annotations: - pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' - inputs: - parameters: - - - name: produce-str-output - container: + f.write(_output_serializers[idx](_outputs[idx])) image: "tensorflow/tensorflow:1.13.2-py3" - command: + inputs: + parameters: + - + name: produce-list-of-strings-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-strings-output-loop-item}}" + command: - python3 - "-u" - "-c" @@ -97,21 +91,21 @@ spec: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) - args: + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-strings-output-loop-item + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-2 + - + container: + args: - "--param1" - "{{inputs.parameters.produce-str-output}}" - - - name: consume-3 - metadata: - annotations: - pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' - inputs: - parameters: - - - name: produce-list-output-loop-item - container: - image: "tensorflow/tensorflow:1.13.2-py3" - command: + command: - python3 - "-u" - "-c" @@ -142,21 +136,111 @@ spec: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) - args: + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-str-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-3 + - + container: + args: - "--param1" - - "{{inputs.parameters.produce-list-output-loop-item}}" - - + - "{{inputs.parameters.produce-list-of-ints-output}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-ints-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" name: consume-4 - metadata: - annotations: - pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "param1"}], "name": "Consume"}' - inputs: - parameters: - - - name: produce-list-output-loop-item-subvar-aaa - container: + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-ints-output-loop-item}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) image: "tensorflow/tensorflow:1.13.2-py3" - command: + inputs: + parameters: + - + name: produce-list-of-ints-output-loop-item + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-5 + - + container: + args: + - "--param1" + - "{{inputs.parameters.produce-list-of-dicts-output}}" + command: - python3 - "-u" - "-c" @@ -187,110 +271,285 @@ spec: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) - args: + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-dicts-output + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-6 + - + container: + args: - "--param1" - - "{{inputs.parameters.produce-list-output-loop-item-subvar-aaa}}" - - - name: for-loop-for-loop-95e50821-1 - inputs: - parameters: - - - name: produce-list-output - - - name: produce-list-output-loop-item - - - name: produce-list-output-loop-item-subvar-aaa - - - name: produce-str-output - dag: - tasks: - - + - "{{inputs.parameters.produce-list-of-dicts-output-loop-item-subvar-aaa}}" + command: + - python3 + - "-u" + - "-c" + - | + def consume(param1): + print(param1) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume', description='') + _parser.add_argument("--param1", dest="param1", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = consume(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + inputs: + parameters: + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" + name: consume-7 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-ints-output + value: "{{inputs.parameters.produce-list-of-ints-output}}" + name: consume-4 + template: consume-4 + - + arguments: + parameters: + - + name: produce-list-of-ints-output-loop-item + value: "{{inputs.parameters.produce-list-of-ints-output-loop-item}}" + name: consume-5 + template: consume-5 + inputs: + parameters: + - + name: produce-list-of-ints-output + - + name: produce-list-of-ints-output-loop-item + name: for-loop-for-loop-1c754d58-2 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-strings-output + value: "{{inputs.parameters.produce-list-of-strings-output}}" name: consume template: consume - arguments: - parameters: - - - name: produce-list-output - value: "{{inputs.parameters.produce-list-output}}" - - + - + arguments: + parameters: + - + name: produce-list-of-strings-output-loop-item + value: "{{inputs.parameters.produce-list-of-strings-output-loop-item}}" name: consume-2 template: consume-2 - arguments: - parameters: - - + - + arguments: + parameters: + - name: produce-str-output value: "{{inputs.parameters.produce-str-output}}" - - name: consume-3 template: consume-3 - arguments: - parameters: - - - name: produce-list-output-loop-item - value: "{{inputs.parameters.produce-list-output-loop-item}}" - - - name: consume-4 - template: consume-4 - arguments: - parameters: - - - name: produce-list-output-loop-item-subvar-aaa - value: "{{inputs.parameters.produce-list-output-loop-item-subvar-aaa}}" - - - name: my-pipeline - dag: - tasks: - - - arguments: - parameters: - - - name: produce-list-output - value: "{{tasks.produce-list.outputs.parameters.produce-list-output}}" - - - name: produce-list-output-loop-item + inputs: + parameters: + - + name: produce-list-of-strings-output + - + name: produce-list-of-strings-output-loop-item + - + name: produce-str-output + name: for-loop-for-loop-82cbc3d1-1 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-dicts-output + value: "{{inputs.parameters.produce-list-of-dicts-output}}" + name: consume-6 + template: consume-6 + - + arguments: + parameters: + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + value: "{{inputs.parameters.produce-list-of-dicts-output-loop-item-subvar-aaa}}" + name: consume-7 + template: consume-7 + inputs: + parameters: + - + name: produce-list-of-dicts-output + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + name: for-loop-for-loop-b57fd6b3-3 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-ints-output + value: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" + - + name: produce-list-of-ints-output-loop-item value: "{{item}}" - - - name: produce-list-output-loop-item-subvar-aaa - value: "{{item.aaa}}" - - + dependencies: + - produce-list-of-ints + name: for-loop-for-loop-1c754d58-2 + template: for-loop-for-loop-1c754d58-2 + withParam: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" + - + arguments: + parameters: + - + name: produce-list-of-strings-output + value: "{{tasks.produce-list-of-strings.outputs.parameters.produce-list-of-strings-output}}" + - + name: produce-list-of-strings-output-loop-item + value: "{{item}}" + - name: produce-str-output value: "{{tasks.produce-str.outputs.parameters.produce-str-output}}" - dependencies: - - produce-list + dependencies: + - produce-list-of-strings - produce-str - name: for-loop-for-loop-95e50821-1 - template: for-loop-for-loop-95e50821-1 - withParam: "{{tasks.produce-list.outputs.parameters.produce-list-output}}" - - - name: produce-list - template: produce-list - - + name: for-loop-for-loop-82cbc3d1-1 + template: for-loop-for-loop-82cbc3d1-1 + withParam: "{{tasks.produce-list-of-strings.outputs.parameters.produce-list-of-strings-output}}" + - + arguments: + parameters: + - + name: produce-list-of-dicts-output + value: "{{tasks.produce-list-of-dicts.outputs.parameters.produce-list-of-dicts-output}}" + - + name: produce-list-of-dicts-output-loop-item-subvar-aaa + value: "{{item.aaa}}" + dependencies: + - produce-list-of-dicts + name: for-loop-for-loop-b57fd6b3-3 + template: for-loop-for-loop-b57fd6b3-3 + withParam: "{{tasks.produce-list-of-dicts.outputs.parameters.produce-list-of-dicts-output}}" + - + name: produce-list-of-dicts + template: produce-list-of-dicts + - + name: produce-list-of-ints + template: produce-list-of-ints + - + name: produce-list-of-strings + template: produce-list-of-strings + - name: produce-str template: produce-str - - - name: produce-list - metadata: - annotations: - pipelines.kubeflow.org/component_spec: '{"name": "Produce list", "outputs": [{"name": "Output", "type": "JsonArray"}]}' - outputs: - artifacts: - - - name: produce-list-output + name: parallelfor-name-clashes-pipeline + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: + - python3 + - "-u" + - "-c" + - | + def produce_list_of_dicts() -> list: + return ([{"aaa": "aaa1", "bbb": "bbb1"}, {"aaa": "aaa2", "bbb": "bbb2"}],) + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer) + + import argparse + _parser = argparse.ArgumentParser(prog='Produce list of dicts', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_list_of_dicts(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce list of dicts\", \"outputs\": [{\"name\": \"Output\", \"type\": \"JsonArray\"}]}" + name: produce-list-of-dicts + outputs: + artifacts: + - + name: produce-list-of-dicts-output path: /tmp/outputs/Output/data - parameters: - - - name: produce-list-output - valueFrom: + parameters: + - + name: produce-list-of-dicts-output + valueFrom: path: /tmp/outputs/Output/data - container: - image: "tensorflow/tensorflow:1.13.2-py3" - command: + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: - python3 - "-u" - "-c" - | - def produce_list() -> list: - return ["1", "2"] + def produce_list_of_ints() -> list: + return ([1234567890, 987654321],) def _serialize_json(obj) -> str: if isinstance(obj, str): @@ -304,12 +563,12 @@ spec: return json.dumps(obj, default=default_serializer) import argparse - _parser = argparse.ArgumentParser(prog='Produce list', description='') + _parser = argparse.ArgumentParser(prog='Produce list of ints', description='') _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) - _outputs = produce_list(**_parsed_args) + _outputs = produce_list_of_ints(**_parsed_args) if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): _outputs = [_outputs] @@ -327,27 +586,90 @@ spec: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) - args: + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce list of ints\", \"outputs\": [{\"name\": \"Output\", \"type\": \"JsonArray\"}]}" + name: produce-list-of-ints + outputs: + artifacts: + - + name: produce-list-of-ints-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-list-of-ints-output + valueFrom: + path: /tmp/outputs/Output/data + - + container: + args: - "----output-paths" - /tmp/outputs/Output/data - - - name: produce-str - metadata: - annotations: - pipelines.kubeflow.org/component_spec: '{"name": "Produce str", "outputs": [{"name": "Output", "type": "String"}]}' - outputs: - artifacts: - - - name: produce-str-output + command: + - python3 + - "-u" + - "-c" + - | + def produce_list_of_strings() -> list: + return (["a", "z"],) + + def _serialize_json(obj) -> str: + if isinstance(obj, str): + return obj + import json + def default_serializer(obj): + if hasattr(obj, 'to_struct'): + return obj.to_struct() + else: + raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) + return json.dumps(obj, default=default_serializer) + + import argparse + _parser = argparse.ArgumentParser(prog='Produce list of strings', description='') + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = produce_list_of_strings(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_json, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce list of strings\", \"outputs\": [{\"name\": \"Output\", \"type\": \"JsonArray\"}]}" + name: produce-list-of-strings + outputs: + artifacts: + - + name: produce-list-of-strings-output path: /tmp/outputs/Output/data - parameters: - - - name: produce-str-output - valueFrom: + parameters: + - + name: produce-list-of-strings-output + valueFrom: path: /tmp/outputs/Output/data - container: - image: "tensorflow/tensorflow:1.13.2-py3" - command: + - + container: + args: + - "----output-paths" + - /tmp/outputs/Output/data + command: - python3 - "-u" - "-c" @@ -384,6 +706,18 @@ spec: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) - args: - - "----output-paths" - - /tmp/outputs/Output/data + image: "tensorflow/tensorflow:1.13.2-py3" + metadata: + annotations: + pipelines.kubeflow.org/component_spec: "{\"name\": \"Produce str\", \"outputs\": [{\"name\": \"Output\", \"type\": \"String\"}]}" + name: produce-str + outputs: + artifacts: + - + name: produce-str-output + path: /tmp/outputs/Output/data + parameters: + - + name: produce-str-output + valueFrom: + path: /tmp/outputs/Output/data From 5138b5bfdbaba2b3d9ae7fc503e84979c88fab92 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Mon, 10 Feb 2020 14:17:44 -0800 Subject: [PATCH 3/4] Stabilize the loop template names --- .../testdata/parallelfor_name_clashes.py | 13 +++ .../testdata/parallelfor_name_clashes.yaml | 92 +++++++++---------- 2 files changed, 59 insertions(+), 46 deletions(-) diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py index 418c079f1c2..ebcc2ee01e4 100644 --- a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py +++ b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py @@ -18,6 +18,19 @@ import kfp from kfp.components import func_to_container_op + +# Stabilizing the test output +class StableIDGenerator: + def __init__(self, ): + self._index = 0 + + def get_next_id(self, ): + self._index += 1 + return '{code:0{num_chars:}d}'.format(code=self._index, num_chars=kfp.dsl._for_loop.LoopArguments.NUM_CODE_CHARS) + +kfp.dsl.ParallelFor._get_unique_id_code = StableIDGenerator().get_next_id + + @func_to_container_op def produce_str() -> str: return "Hello" diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml index 87cfb640e96..4545925f8de 100644 --- a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml +++ b/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml @@ -325,32 +325,6 @@ spec: annotations: pipelines.kubeflow.org/component_spec: "{\"inputs\": [{\"name\": \"param1\"}], \"name\": \"Consume\"}" name: consume-7 - - - dag: - tasks: - - - arguments: - parameters: - - - name: produce-list-of-ints-output - value: "{{inputs.parameters.produce-list-of-ints-output}}" - name: consume-4 - template: consume-4 - - - arguments: - parameters: - - - name: produce-list-of-ints-output-loop-item - value: "{{inputs.parameters.produce-list-of-ints-output-loop-item}}" - name: consume-5 - template: consume-5 - inputs: - parameters: - - - name: produce-list-of-ints-output - - - name: produce-list-of-ints-output-loop-item - name: for-loop-for-loop-1c754d58-2 - dag: tasks: @@ -386,7 +360,33 @@ spec: name: produce-list-of-strings-output-loop-item - name: produce-str-output - name: for-loop-for-loop-82cbc3d1-1 + name: for-loop-for-loop-00000001-1 + - + dag: + tasks: + - + arguments: + parameters: + - + name: produce-list-of-ints-output + value: "{{inputs.parameters.produce-list-of-ints-output}}" + name: consume-4 + template: consume-4 + - + arguments: + parameters: + - + name: produce-list-of-ints-output-loop-item + value: "{{inputs.parameters.produce-list-of-ints-output-loop-item}}" + name: consume-5 + template: consume-5 + inputs: + parameters: + - + name: produce-list-of-ints-output + - + name: produce-list-of-ints-output-loop-item + name: for-loop-for-loop-00000002-2 - dag: tasks: @@ -412,24 +412,10 @@ spec: name: produce-list-of-dicts-output - name: produce-list-of-dicts-output-loop-item-subvar-aaa - name: for-loop-for-loop-b57fd6b3-3 + name: for-loop-for-loop-00000003-3 - dag: tasks: - - - arguments: - parameters: - - - name: produce-list-of-ints-output - value: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" - - - name: produce-list-of-ints-output-loop-item - value: "{{item}}" - dependencies: - - produce-list-of-ints - name: for-loop-for-loop-1c754d58-2 - template: for-loop-for-loop-1c754d58-2 - withParam: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" - arguments: parameters: @@ -445,9 +431,23 @@ spec: dependencies: - produce-list-of-strings - produce-str - name: for-loop-for-loop-82cbc3d1-1 - template: for-loop-for-loop-82cbc3d1-1 + name: for-loop-for-loop-00000001-1 + template: for-loop-for-loop-00000001-1 withParam: "{{tasks.produce-list-of-strings.outputs.parameters.produce-list-of-strings-output}}" + - + arguments: + parameters: + - + name: produce-list-of-ints-output + value: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" + - + name: produce-list-of-ints-output-loop-item + value: "{{item}}" + dependencies: + - produce-list-of-ints + name: for-loop-for-loop-00000002-2 + template: for-loop-for-loop-00000002-2 + withParam: "{{tasks.produce-list-of-ints.outputs.parameters.produce-list-of-ints-output}}" - arguments: parameters: @@ -459,8 +459,8 @@ spec: value: "{{item.aaa}}" dependencies: - produce-list-of-dicts - name: for-loop-for-loop-b57fd6b3-3 - template: for-loop-for-loop-b57fd6b3-3 + name: for-loop-for-loop-00000003-3 + template: for-loop-for-loop-00000003-3 withParam: "{{tasks.produce-list-of-dicts.outputs.parameters.produce-list-of-dicts-output}}" - name: produce-list-of-dicts From 5f626c1880e9f0793720f8cd4d733eaafbe68408 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Mon, 10 Feb 2020 14:29:51 -0800 Subject: [PATCH 4/4] Renamed the test case --- sdk/python/tests/compiler/compiler_tests.py | 4 ++-- ..._clashes.py => parallelfor_item_argument_resolving.py} | 4 ++-- ...shes.yaml => parallelfor_item_argument_resolving.yaml} | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) rename sdk/python/tests/compiler/testdata/{parallelfor_name_clashes.py => parallelfor_item_argument_resolving.py} (94%) rename sdk/python/tests/compiler/testdata/{parallelfor_name_clashes.yaml => parallelfor_item_argument_resolving.yaml} (99%) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 81549370a64..5f12f4c18de 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -783,8 +783,8 @@ def test_withparam_output_dict(self): def test_withparam_lightweight_out(self): self._test_py_compile_yaml('loop_over_lightweight_output') - def test_parallelfor_name_clashes(self): - self._test_py_compile_yaml('parallelfor_name_clashes') + def test_parallelfor_item_argument_resolving(self): + self._test_py_compile_yaml('parallelfor_item_argument_resolving') def test_py_input_artifact_raw_value(self): """Test pipeline input_artifact_raw_value.""" diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py similarity index 94% rename from sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py rename to sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py index ebcc2ee01e4..ebcb8c28024 100644 --- a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.py +++ b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py @@ -57,7 +57,7 @@ def consume(param1): @kfp.dsl.pipeline() -def parallelfor_name_clashes_pipeline(): +def parallelfor_item_argument_resolving(): produce_str_task = produce_str() produce_list_of_strings_task = produce_list_of_strings() produce_list_of_ints_task = produce_list_of_ints() @@ -80,5 +80,5 @@ def parallelfor_name_clashes_pipeline(): if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(parallelfor_name_clashes_pipeline, __file__ + '.yaml') + compiler.Compiler().compile(parallelfor_item_argument_resolving, __file__ + '.yaml') diff --git a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.yaml similarity index 99% rename from sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml rename to sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.yaml index 4545925f8de..8c9ef5ad996 100644 --- a/sdk/python/tests/compiler/testdata/parallelfor_name_clashes.yaml +++ b/sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.yaml @@ -2,12 +2,12 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: annotations: - pipelines.kubeflow.org/pipeline_spec: "{\"name\": \"Parallelfor name clashes pipeline\"}" - generateName: parallelfor-name-clashes-pipeline- + pipelines.kubeflow.org/pipeline_spec: "{\"name\": \"Parallelfor item argument resolving pipeline\"}" + generateName: parallelfor-item-argument-resolving- spec: arguments: parameters: [] - entrypoint: parallelfor-name-clashes-pipeline + entrypoint: parallelfor-item-argument-resolving serviceAccountName: pipeline-runner templates: - @@ -474,7 +474,7 @@ spec: - name: produce-str template: produce-str - name: parallelfor-name-clashes-pipeline + name: parallelfor-item-argument-resolving - container: args: