From 7855d0201f1902a706b1cd07c2da627b7ca58796 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 22 Jan 2019 02:30:39 -0800 Subject: [PATCH 01/10] SDK/Components - Creating graph components from python pipeline function `create_graph_component_from_pipeline_func` converts python pipeline function to a graph component object that can be saved, shared, composed or submitted for execution. Example: producer_op = load_component(component_with_0_inputs_and_2_outputs) processor_op = load_component(component_with_2_inputs_and_2_outputs) def pipeline1(pipeline_param_1: int): producer_task = producer_op() processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) return OrderedDict([ ('Pipeline output 1', producer_task.outputs['Output 1']), ('Pipeline output 2', processor_task.outputs['Output 2']), ]) graph_component = create_graph_component_from_pipeline_func(pipeline1) --- .../components/_python_to_graph_component.py | 108 ++++++++++++++++ ...test_python_pipeline_to_graph_component.py | 122 ++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 sdk/python/kfp/components/_python_to_graph_component.py create mode 100644 sdk/python/tests/components/test_python_pipeline_to_graph_component.py diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py new file mode 100644 index 00000000000..96b856b3c7c --- /dev/null +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -0,0 +1,108 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [ + 'create_graph_component_from_pipeline_func', +] + + +import inspect +from collections import OrderedDict + +import kfp.components as comp +from kfp.components._structures import TaskSpec, ComponentSpec, TaskOutputReference, InputSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec +from kfp.components._naming import _make_name_unique_by_adding_index, generate_unique_name_conversion_table, _sanitize_python_function_name, _convert_to_human_name + + +def create_graph_component_from_pipeline_func(pipeline_func) -> ComponentSpec: + '''Converts python pipeline function to a graph component object that can be saved, shared, composed or submitted for execution. + + Example: + + producer_op = load_component(component_with_0_inputs_and_2_outputs) + processor_op = load_component(component_with_2_inputs_and_2_outputs) + + def pipeline1(pipeline_param_1: int): + producer_task = producer_op() + processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) + + return OrderedDict([ + ('Pipeline output 1', producer_task.outputs['Output 1']), + ('Pipeline output 2', processor_task.outputs['Output 2']), + ]) + + graph_component = create_graph_component_from_pipeline_func(pipeline1) + ''' + + task_map = OrderedDict() #Preserving task order + + def task_construction_handler(task: TaskSpec): + #Rewriting task ids so that they're same every time + task_id = task.component_ref.spec.name or "Task" + task_id = _make_name_unique_by_adding_index(task_id, task_map.keys(), ' ') + for output_ref in task.outputs.values(): + output_ref.task_output.task_id = task_id + task_map[task_id] = task + + return task #The handler is a transformation function, so it must pass the task through. + + pipeline_signature = inspect.signature(pipeline_func) + parameters = list(pipeline_signature.parameters.values()) + parameter_names = [param.name for param in parameters] + human_parameter_names_map = generate_unique_name_conversion_table(parameter_names, _convert_to_human_name) + + #Preparing the pipeline_func arguments + pipeline_func_args = {param.name: GraphInputArgument(input_name=human_parameter_names_map[param.name]) for param in parameters} + + try: + #Setting the contextmanager to fix and catch the tasks. + comp._components._created_task_transformation_handler.append(task_construction_handler) + + #Calling the pipeline_func with GraphInputArgument instances as arguments + pipeline_func_result = pipeline_func(**pipeline_func_args) + finally: + comp._components._created_task_transformation_handler.pop() + + graph_output_value_map = OrderedDict() + if pipeline_func_result is None: + pass + elif isinstance(pipeline_func_result, dict): + graph_output_value_map = pipeline_func_result + else: + raise TypeError('Pipeline must return outputs as OrderedDict.') + + #Checking the pipeline_func output object types + for output_name, output_value in graph_output_value_map.items(): + if not isinstance(output_value, TaskOutputArgument): + raise TypeError('Only TaskOutputArgument instances should be returned from graph component, but got "{output_name}" = "{}".'.format(output_name, str(output_value))) + + graph_output_specs = [OutputSpec(name=output_name, type=output_value.task_output.type) for output_name, output_value in graph_output_value_map.items()] + + def convert_inspect_empty_to_none(value): + return value if value is not inspect.Parameter.empty else None + graph_input_specs = [InputSpec(name=human_parameter_names_map[param.name], default=convert_inspect_empty_to_none(param.default)) for param in parameters] #TODO: Convert type annotations to component artifact types + + component_name = _convert_to_human_name(pipeline_func.__name__) + component = ComponentSpec( + name=component_name, + inputs=graph_input_specs, + outputs=graph_output_specs, + implementation=GraphImplementation( + graph=GraphSpec( + tasks=task_map, + output_values=graph_output_value_map, + ) + ) + ) + return component diff --git a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py new file mode 100644 index 00000000000..e2c7b6210be --- /dev/null +++ b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py @@ -0,0 +1,122 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import unittest +from collections import OrderedDict +from pathlib import Path + +sys.path.insert(0, __file__ + '/../../../') + +import kfp.components as comp +from kfp.components._python_to_graph_component import create_graph_component_from_pipeline_func + + +component_with_2_inputs_and_0_outputs = '''\ +name: Component with 2 inputs and 0 outputs +inputs: +- {name: Input parameter} +- {name: Input artifact} +implementation: + container: + image: busybox + command: [sh, -c, ' + echo "Input parameter = $0" + echo "Input artifact = $(< $1)" + ' + ] + args: + - {inputValue: Input parameter} + - {inputPath: Input artifact} +''' + + +component_with_0_inputs_and_2_outputs = '''\ +name: Component with 0 inputs and 2 outputs +outputs: +- {name: Output 1} +- {name: Output 2} +implementation: + container: + image: busybox + command: [sh, -c, ' + echo "Data 1" > $0 + echo "Data 2" > $1 + ' + ] + args: + - {outputPath: Output 1} + - {outputPath: Output 2} +''' + + +component_with_2_inputs_and_2_outputs = '''\ +name: Component with 2 inputs and 2 outputs +inputs: +- {name: Input parameter} +- {name: Input artifact} +outputs: +- {name: Output 1} +- {name: Output 2} +implementation: + container: + image: busybox + command: [sh, -c, ' + mkdir -p $(dirname "$2") + mkdir -p $(dirname "$3") + echo "$0" > "$2" + cp "$1" "$3" + ' + ] + args: + - {inputValue: Input parameter} + - {inputPath: Input artifact} + - {outputPath: Output 1} + - {outputPath: Output 2} +''' + + +class PythonPipelineToGraphComponentTestCase(unittest.TestCase): + def test_handle_creating_graph_component_from_pipeline_that_uses_container_components(self): + producer_op = comp.load_component_from_text(component_with_0_inputs_and_2_outputs) + processor_op = comp.load_component_from_text(component_with_2_inputs_and_2_outputs) + #consumer_op = comp.load_component_from_text(component_with_2_inputs_and_0_outputs) + + #def pipeline(pipeline_param_1: int): + def pipeline1(pipeline_param_1: int): + producer_task = producer_op() + processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) + #processor_task = processor_op(pipeline_param_1, producer_task.outputs.output_2) + #consumer_task = consumer_op(processor_task.outputs['Output 1'], processor_task.outputs['Output 2']) + + #return (producer_task.outputs['Output 1'], processor_task.outputs['Output 2']) + #return [producer_task.outputs['Output 1'], processor_task.outputs['Output 2']] + #return {'Pipeline output 1': producer_task.outputs['Output 1'], 'Pipeline output 2': processor_task.outputs['Output 2']} + return OrderedDict([ + ('Pipeline output 1', producer_task.outputs['Output 1']), + ('Pipeline output 2', processor_task.outputs['Output 2']), + ]) + #return namedtuple('Pipeline1Outputs', ['Pipeline output 1', 'Pipeline output 2'])(producer_task.outputs['Output 1'], processor_task.outputs['Output 2']) + + + graph_component = create_graph_component_from_pipeline_func(pipeline1) + self.assertEqual(len(graph_component.inputs), 1) + self.assertListEqual([input.name for input in graph_component.inputs], ['Pipeline param 1']) #Relies on human name conversion function stability + self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2']) + self.assertEqual(len(graph_component.implementation.graph.tasks), 2) + + +if __name__ == '__main__': + unittest.main() From a9b120ffc790ea3c39b3670d559ccfa86fe266f0 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sat, 28 Sep 2019 22:26:32 -0700 Subject: [PATCH 02/10] Changed the signatures of exported functions Non-public create_graph_component_spec_from_pipeline_func creates ComponentSpec Public create_graph_component_from_pipeline_func creates component and writes it to file. --- .../components/_python_to_graph_component.py | 28 +++++++++++++++---- ...test_python_pipeline_to_graph_component.py | 4 +-- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 96b856b3c7c..8a90509bfe2 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -19,19 +19,27 @@ import inspect from collections import OrderedDict +from typing import Callable import kfp.components as comp from kfp.components._structures import TaskSpec, ComponentSpec, TaskOutputReference, InputSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec from kfp.components._naming import _make_name_unique_by_adding_index, generate_unique_name_conversion_table, _sanitize_python_function_name, _convert_to_human_name -def create_graph_component_from_pipeline_func(pipeline_func) -> ComponentSpec: - '''Converts python pipeline function to a graph component object that can be saved, shared, composed or submitted for execution. +def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str) -> None: + '''Experimental! Creates graph component definition from a python pipeline function. The component file can be published for sharing. + Pipeline function is a function that only calls component functions and passes outputs to inputs. + This feature is experimental and lacks support for some of the DSL features like conditions and loops. + Only pipelines consisting of loaded components or python components are currently supported (no manually created ContainerOps or ResourceOps). + + Args: + pipeline_func: Python function to convert + output_component_file: Path of the file where the component definition will be written. The `component.yaml` file can then be published for sharing. Example: - producer_op = load_component(component_with_0_inputs_and_2_outputs) - processor_op = load_component(component_with_2_inputs_and_2_outputs) + producer_op = load_component_from_file('producer/component.yaml') + processor_op = load_component_from_file('processor/component.yaml') def pipeline1(pipeline_param_1: int): producer_task = producer_op() @@ -42,8 +50,18 @@ def pipeline1(pipeline_param_1: int): ('Pipeline output 2', processor_task.outputs['Output 2']), ]) - graph_component = create_graph_component_from_pipeline_func(pipeline1) + create_graph_component_from_pipeline_func(pipeline1, output_component_file='pipeline.component.yaml') ''' + component_spec = create_graph_component_spec_from_pipeline_func(pipeline_func) + if output_component_file: + from pathlib import Path + from ._yaml_utils import dump_yaml + component_dict = component_spec.to_dict() + component_yaml = dump_yaml(component_dict) + Path(output_component_file).write_text(component_yaml) + + +def create_graph_component_spec_from_pipeline_func(pipeline_func: Callable) -> ComponentSpec: task_map = OrderedDict() #Preserving task order diff --git a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py index e2c7b6210be..bbaf5d5e49d 100644 --- a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py +++ b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py @@ -21,7 +21,7 @@ sys.path.insert(0, __file__ + '/../../../') import kfp.components as comp -from kfp.components._python_to_graph_component import create_graph_component_from_pipeline_func +from kfp.components._python_to_graph_component import create_graph_component_spec_from_pipeline_func component_with_2_inputs_and_0_outputs = '''\ @@ -111,7 +111,7 @@ def pipeline1(pipeline_param_1: int): #return namedtuple('Pipeline1Outputs', ['Pipeline output 1', 'Pipeline output 2'])(producer_task.outputs['Output 1'], processor_task.outputs['Output 2']) - graph_component = create_graph_component_from_pipeline_func(pipeline1) + graph_component = create_graph_component_spec_from_pipeline_func(pipeline1) self.assertEqual(len(graph_component.inputs), 1) self.assertListEqual([input.name for input in graph_component.inputs], ['Pipeline param 1']) #Relies on human name conversion function stability self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2']) From 2003ddf121df2c842613287756f34529d3ad64a7 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sat, 28 Sep 2019 22:32:08 -0700 Subject: [PATCH 03/10] Switched to using _extract_component_interface to analyze function signature Stopped humanizing the input names for now. I think it's benefitial to extract interface from function signature the same way for both container and graph python components. --- .../components/_python_to_graph_component.py | 47 +++++++++---------- ...test_python_pipeline_to_graph_component.py | 2 +- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 8a90509bfe2..5938f785ace 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -24,6 +24,7 @@ import kfp.components as comp from kfp.components._structures import TaskSpec, ComponentSpec, TaskOutputReference, InputSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec from kfp.components._naming import _make_name_unique_by_adding_index, generate_unique_name_conversion_table, _sanitize_python_function_name, _convert_to_human_name +from kfp.components._python_op import _extract_component_interface def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str) -> None: @@ -63,6 +64,13 @@ def pipeline1(pipeline_param_1: int): def create_graph_component_spec_from_pipeline_func(pipeline_func: Callable) -> ComponentSpec: + component_spec = _extract_component_interface(pipeline_func) + # Checking the function parameters - they should not have file passing annotations. + input_specs = component_spec.inputs or [] + for input in input_specs: + if input._passing_style: + raise TypeError('Graph component function parameter "{}" cannot have file-passing annotation "{}".'.format(input.name, input._passing_style)) + task_map = OrderedDict() #Preserving task order def task_construction_handler(task: TaskSpec): @@ -75,16 +83,12 @@ def task_construction_handler(task: TaskSpec): return task #The handler is a transformation function, so it must pass the task through. - pipeline_signature = inspect.signature(pipeline_func) - parameters = list(pipeline_signature.parameters.values()) - parameter_names = [param.name for param in parameters] - human_parameter_names_map = generate_unique_name_conversion_table(parameter_names, _convert_to_human_name) - - #Preparing the pipeline_func arguments - pipeline_func_args = {param.name: GraphInputArgument(input_name=human_parameter_names_map[param.name]) for param in parameters} + # Preparing the pipeline_func arguments + # TODO: The key should be original parameter name if different + pipeline_func_args = {input.name: GraphInputArgument(input_name=input.name) for input in input_specs} try: - #Setting the contextmanager to fix and catch the tasks. + #Setting the handler to fix and catch the tasks. comp._components._created_task_transformation_handler.append(task_construction_handler) #Calling the pipeline_func with GraphInputArgument instances as arguments @@ -99,28 +103,19 @@ def task_construction_handler(task: TaskSpec): graph_output_value_map = pipeline_func_result else: raise TypeError('Pipeline must return outputs as OrderedDict.') - + #Checking the pipeline_func output object types for output_name, output_value in graph_output_value_map.items(): if not isinstance(output_value, TaskOutputArgument): raise TypeError('Only TaskOutputArgument instances should be returned from graph component, but got "{output_name}" = "{}".'.format(output_name, str(output_value))) - graph_output_specs = [OutputSpec(name=output_name, type=output_value.task_output.type) for output_name, output_value in graph_output_value_map.items()] - - def convert_inspect_empty_to_none(value): - return value if value is not inspect.Parameter.empty else None - graph_input_specs = [InputSpec(name=human_parameter_names_map[param.name], default=convert_inspect_empty_to_none(param.default)) for param in parameters] #TODO: Convert type annotations to component artifact types - - component_name = _convert_to_human_name(pipeline_func.__name__) - component = ComponentSpec( - name=component_name, - inputs=graph_input_specs, - outputs=graph_output_specs, - implementation=GraphImplementation( - graph=GraphSpec( - tasks=task_map, - output_values=graph_output_value_map, - ) + if not component_spec.outputs: + component_spec.outputs = [OutputSpec(name=output_name, type=output_value.task_output.type) for output_name, output_value in graph_output_value_map.items()] + + component_spec.implementation = GraphImplementation( + graph=GraphSpec( + tasks=task_map, + output_values=graph_output_value_map, ) ) - return component + return component_spec diff --git a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py index bbaf5d5e49d..fd43f64c524 100644 --- a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py +++ b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py @@ -113,7 +113,7 @@ def pipeline1(pipeline_param_1: int): graph_component = create_graph_component_spec_from_pipeline_func(pipeline1) self.assertEqual(len(graph_component.inputs), 1) - self.assertListEqual([input.name for input in graph_component.inputs], ['Pipeline param 1']) #Relies on human name conversion function stability + self.assertListEqual([input.name for input in graph_component.inputs], ['pipeline_param_1']) #Relies on human name conversion function stability self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2']) self.assertEqual(len(graph_component.implementation.graph.tasks), 2) From dc92137c597941ec12517e7d112dc060b26f2e66 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sat, 28 Sep 2019 22:37:32 -0700 Subject: [PATCH 04/10] Support outputs declared using pipeline function's return annotation --- .../components/_python_to_graph_component.py | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 5938f785ace..5b8df245101 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -96,20 +96,41 @@ def task_construction_handler(task: TaskSpec): finally: comp._components._created_task_transformation_handler.pop() - graph_output_value_map = OrderedDict() + + # Getting graph outputs + output_names = [output.name for output in (component_spec.outputs or [])] + + if len(output_names) == 1 and output_names[0] == 'Output': # TODO: Check whether the NamedTuple syntax was used + pipeline_func_result = [pipeline_func_result] + + if isinstance(pipeline_func_result, tuple) and hasattr(pipeline_func_result, '_asdict'): # collections.namedtuple and typing.NamedTuple + pipeline_func_result = pipeline_func_result._asdict() + + if isinstance(pipeline_func_result, dict): + if output_names: + if set(output_names) != set(pipeline_func_result.keys()): + raise ValueError('Returned outputs do not match outputs specified in the function signature: {} = {}'.format(str(set(pipeline_func_result.keys())), str(set(output_names)))) + if pipeline_func_result is None: - pass + graph_output_value_map = {} elif isinstance(pipeline_func_result, dict): - graph_output_value_map = pipeline_func_result + graph_output_value_map = OrderedDict(pipeline_func_result) + elif isinstance(pipeline_func_result, (list, tuple)): + if output_names: + if len(pipeline_func_result) != len(output_names): + raise ValueError('Expected {} values from pipeline function, but got {}.'.format(len(output_names), len(pipeline_func_result))) + graph_output_value_map = OrderedDict((name_value[0], name_value[1]) for name_value in zip(output_names, pipeline_func_result)) + else: + graph_output_value_map = OrderedDict((output_value.task_output.output_name, output_value) for output_value in pipeline_func_result) # TODO: Fix possible name non-uniqueness (e.g. use task id as prefix or add index to non-unique names) else: - raise TypeError('Pipeline must return outputs as OrderedDict.') + raise TypeError('Pipeline must return outputs as tuple or OrderedDict.') #Checking the pipeline_func output object types for output_name, output_value in graph_output_value_map.items(): if not isinstance(output_value, TaskOutputArgument): raise TypeError('Only TaskOutputArgument instances should be returned from graph component, but got "{output_name}" = "{}".'.format(output_name, str(output_value))) - if not component_spec.outputs: + if not component_spec.outputs and graph_output_value_map: component_spec.outputs = [OutputSpec(name=output_name, type=output_value.task_output.type) for output_name, output_value in graph_output_value_map.items()] component_spec.implementation = GraphImplementation( From c034dc24008bb0e909ed95652d95d8240c391635 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sat, 28 Sep 2019 23:22:32 -0700 Subject: [PATCH 05/10] Cleaned up the test --- ...with_0_inputs_and_2_outputs.component.yaml | 15 ++++ ...with_2_inputs_and_0_outputs.component.yaml | 15 ++++ ...with_2_inputs_and_2_outputs.component.yaml | 22 +++++ ...test_python_pipeline_to_graph_component.py | 87 ++----------------- 4 files changed, 60 insertions(+), 79 deletions(-) create mode 100644 sdk/python/tests/components/test_data/component_with_0_inputs_and_2_outputs.component.yaml create mode 100644 sdk/python/tests/components/test_data/component_with_2_inputs_and_0_outputs.component.yaml create mode 100644 sdk/python/tests/components/test_data/component_with_2_inputs_and_2_outputs.component.yaml diff --git a/sdk/python/tests/components/test_data/component_with_0_inputs_and_2_outputs.component.yaml b/sdk/python/tests/components/test_data/component_with_0_inputs_and_2_outputs.component.yaml new file mode 100644 index 00000000000..ea366a2f4bf --- /dev/null +++ b/sdk/python/tests/components/test_data/component_with_0_inputs_and_2_outputs.component.yaml @@ -0,0 +1,15 @@ +name: Component with 0 inputs and 2 outputs +outputs: +- {name: Output 1} +- {name: Output 2} +implementation: + container: + image: busybox + command: [sh, -c, ' + echo "Data 1" > $0 + echo "Data 2" > $1 + ' + ] + args: + - {outputPath: Output 1} + - {outputPath: Output 2} diff --git a/sdk/python/tests/components/test_data/component_with_2_inputs_and_0_outputs.component.yaml b/sdk/python/tests/components/test_data/component_with_2_inputs_and_0_outputs.component.yaml new file mode 100644 index 00000000000..07473a4caba --- /dev/null +++ b/sdk/python/tests/components/test_data/component_with_2_inputs_and_0_outputs.component.yaml @@ -0,0 +1,15 @@ +name: Component with 2 inputs and 0 outputs +inputs: +- {name: Input parameter} +- {name: Input artifact} +implementation: + container: + image: busybox + command: [sh, -c, ' + echo "Input parameter = $0" + echo "Input artifact = $(< $1)" + ' + ] + args: + - {inputValue: Input parameter} + - {inputPath: Input artifact} diff --git a/sdk/python/tests/components/test_data/component_with_2_inputs_and_2_outputs.component.yaml b/sdk/python/tests/components/test_data/component_with_2_inputs_and_2_outputs.component.yaml new file mode 100644 index 00000000000..bd25f001561 --- /dev/null +++ b/sdk/python/tests/components/test_data/component_with_2_inputs_and_2_outputs.component.yaml @@ -0,0 +1,22 @@ +name: Component with 2 inputs and 2 outputs +inputs: +- {name: Input parameter} +- {name: Input artifact} +outputs: +- {name: Output 1} +- {name: Output 2} +implementation: + container: + image: busybox + command: [sh, -c, ' + mkdir -p $(dirname "$2") + mkdir -p $(dirname "$3") + echo "$0" > "$2" + cp "$1" "$3" + ' + ] + args: + - {inputValue: Input parameter} + - {inputPath: Input artifact} + - {outputPath: Output 1} + - {outputPath: Output 2} diff --git a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py index fd43f64c524..57227a3bab5 100644 --- a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py +++ b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py @@ -18,104 +18,33 @@ from collections import OrderedDict from pathlib import Path -sys.path.insert(0, __file__ + '/../../../') - import kfp.components as comp from kfp.components._python_to_graph_component import create_graph_component_spec_from_pipeline_func -component_with_2_inputs_and_0_outputs = '''\ -name: Component with 2 inputs and 0 outputs -inputs: -- {name: Input parameter} -- {name: Input artifact} -implementation: - container: - image: busybox - command: [sh, -c, ' - echo "Input parameter = $0" - echo "Input artifact = $(< $1)" - ' - ] - args: - - {inputValue: Input parameter} - - {inputPath: Input artifact} -''' - - -component_with_0_inputs_and_2_outputs = '''\ -name: Component with 0 inputs and 2 outputs -outputs: -- {name: Output 1} -- {name: Output 2} -implementation: - container: - image: busybox - command: [sh, -c, ' - echo "Data 1" > $0 - echo "Data 2" > $1 - ' - ] - args: - - {outputPath: Output 1} - - {outputPath: Output 2} -''' - - -component_with_2_inputs_and_2_outputs = '''\ -name: Component with 2 inputs and 2 outputs -inputs: -- {name: Input parameter} -- {name: Input artifact} -outputs: -- {name: Output 1} -- {name: Output 2} -implementation: - container: - image: busybox - command: [sh, -c, ' - mkdir -p $(dirname "$2") - mkdir -p $(dirname "$3") - echo "$0" > "$2" - cp "$1" "$3" - ' - ] - args: - - {inputValue: Input parameter} - - {inputPath: Input artifact} - - {outputPath: Output 1} - - {outputPath: Output 2} -''' - - class PythonPipelineToGraphComponentTestCase(unittest.TestCase): def test_handle_creating_graph_component_from_pipeline_that_uses_container_components(self): - producer_op = comp.load_component_from_text(component_with_0_inputs_and_2_outputs) - processor_op = comp.load_component_from_text(component_with_2_inputs_and_2_outputs) - #consumer_op = comp.load_component_from_text(component_with_2_inputs_and_0_outputs) + test_data_dir = Path(__file__).parent / 'test_data' + producer_op = comp.load_component_from_file(str(test_data_dir / 'component_with_0_inputs_and_2_outputs.component.yaml')) + processor_op = comp.load_component_from_file(str(test_data_dir / 'component_with_2_inputs_and_2_outputs.component.yaml')) + consumer_op = comp.load_component_from_file(str(test_data_dir / 'component_with_2_inputs_and_0_outputs.component.yaml')) - #def pipeline(pipeline_param_1: int): def pipeline1(pipeline_param_1: int): producer_task = producer_op() processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) - #processor_task = processor_op(pipeline_param_1, producer_task.outputs.output_2) - #consumer_task = consumer_op(processor_task.outputs['Output 1'], processor_task.outputs['Output 2']) + consumer_task = consumer_op(processor_task.outputs['Output 1'], processor_task.outputs['Output 2']) - #return (producer_task.outputs['Output 1'], processor_task.outputs['Output 2']) - #return [producer_task.outputs['Output 1'], processor_task.outputs['Output 2']] - #return {'Pipeline output 1': producer_task.outputs['Output 1'], 'Pipeline output 2': processor_task.outputs['Output 2']} - return OrderedDict([ + return OrderedDict([ # You can safely return normal dict in python 3.6+ ('Pipeline output 1', producer_task.outputs['Output 1']), ('Pipeline output 2', processor_task.outputs['Output 2']), ]) - #return namedtuple('Pipeline1Outputs', ['Pipeline output 1', 'Pipeline output 2'])(producer_task.outputs['Output 1'], processor_task.outputs['Output 2']) - graph_component = create_graph_component_spec_from_pipeline_func(pipeline1) + self.assertEqual(len(graph_component.inputs), 1) self.assertListEqual([input.name for input in graph_component.inputs], ['pipeline_param_1']) #Relies on human name conversion function stability self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2']) - self.assertEqual(len(graph_component.implementation.graph.tasks), 2) + self.assertEqual(len(graph_component.implementation.graph.tasks), 3) if __name__ == '__main__': From 10a0f85ade4d54f4bccf4cdaedad55ddab70ebf6 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sat, 28 Sep 2019 23:43:17 -0700 Subject: [PATCH 06/10] Stop including the whole parent tasks in task output references --- sdk/python/kfp/components/_python_to_graph_component.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 5b8df245101..66f3c40dc15 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -79,6 +79,7 @@ def task_construction_handler(task: TaskSpec): task_id = _make_name_unique_by_adding_index(task_id, task_map.keys(), ' ') for output_ref in task.outputs.values(): output_ref.task_output.task_id = task_id + output_ref.task_output.task = None task_map[task_id] = task return task #The handler is a transformation function, so it must pass the task through. From 0becc25586a0b71f6a6c80a4083c0f2af9ac6c6c Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sun, 29 Sep 2019 00:04:33 -0700 Subject: [PATCH 07/10] By default, do not include task component specs in the graph component Remove the component spec from component reference unless it will make the reference empty or unless explicitly asked by the user --- .../kfp/components/_python_to_graph_component.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 66f3c40dc15..5231ca46459 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -27,7 +27,7 @@ from kfp.components._python_op import _extract_component_interface -def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str) -> None: +def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str, embed_component_specs: bool = False) -> None: '''Experimental! Creates graph component definition from a python pipeline function. The component file can be published for sharing. Pipeline function is a function that only calls component functions and passes outputs to inputs. This feature is experimental and lacks support for some of the DSL features like conditions and loops. @@ -36,6 +36,7 @@ def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_co Args: pipeline_func: Python function to convert output_component_file: Path of the file where the component definition will be written. The `component.yaml` file can then be published for sharing. + embed_component_specs: Whether to embed component definitions or just reference them. Embedding makes the graph component self-contained. Default is False. Example: @@ -53,7 +54,7 @@ def pipeline1(pipeline_param_1: int): create_graph_component_from_pipeline_func(pipeline1, output_component_file='pipeline.component.yaml') ''' - component_spec = create_graph_component_spec_from_pipeline_func(pipeline_func) + component_spec = create_graph_component_spec_from_pipeline_func(pipeline_func, embed_component_specs) if output_component_file: from pathlib import Path from ._yaml_utils import dump_yaml @@ -62,7 +63,7 @@ def pipeline1(pipeline_param_1: int): Path(output_component_file).write_text(component_yaml) -def create_graph_component_spec_from_pipeline_func(pipeline_func: Callable) -> ComponentSpec: +def create_graph_component_spec_from_pipeline_func(pipeline_func: Callable, embed_component_specs: bool = False) -> ComponentSpec: component_spec = _extract_component_interface(pipeline_func) # Checking the function parameters - they should not have file passing annotations. @@ -81,6 +82,9 @@ def task_construction_handler(task: TaskSpec): output_ref.task_output.task_id = task_id output_ref.task_output.task = None task_map[task_id] = task + # Remove the component spec from component reference unless it will make the reference empty or unless explicitly asked by the user + if not embed_component_specs and any([task.component_ref.name, task.component_ref.url, task.component_ref.digest]): + task.component_ref.spec = None return task #The handler is a transformation function, so it must pass the task through. From 128cb0d704310d30af7ed9e3274bba9a6eafe03a Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sun, 29 Sep 2019 00:33:27 -0700 Subject: [PATCH 08/10] Exported the create_graph_component_from_pipeline_func function --- sdk/python/kfp/components/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/kfp/components/__init__.py b/sdk/python/kfp/components/__init__.py index 4f4d8534efc..56df0136778 100644 --- a/sdk/python/kfp/components/__init__.py +++ b/sdk/python/kfp/components/__init__.py @@ -15,4 +15,5 @@ from ._airflow_op import * from ._components import * from ._python_op import * +from ._python_to_graph_component import * from ._component_store import * From e410b4d5923a7e344bfc812acdccb7f955c43cc2 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Mon, 30 Sep 2019 18:29:56 -0700 Subject: [PATCH 09/10] Fixed imports --- .../kfp/components/_python_to_graph_component.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 5231ca46459..711bef2ba23 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -21,10 +21,10 @@ from collections import OrderedDict from typing import Callable -import kfp.components as comp -from kfp.components._structures import TaskSpec, ComponentSpec, TaskOutputReference, InputSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec -from kfp.components._naming import _make_name_unique_by_adding_index, generate_unique_name_conversion_table, _sanitize_python_function_name, _convert_to_human_name -from kfp.components._python_op import _extract_component_interface +from . import _components +from ._structures import TaskSpec, ComponentSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec +from ._naming import _make_name_unique_by_adding_index +from ._python_op import _extract_component_interface def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str, embed_component_specs: bool = False) -> None: @@ -94,12 +94,12 @@ def task_construction_handler(task: TaskSpec): try: #Setting the handler to fix and catch the tasks. - comp._components._created_task_transformation_handler.append(task_construction_handler) + _components._created_task_transformation_handler.append(task_construction_handler) #Calling the pipeline_func with GraphInputArgument instances as arguments pipeline_func_result = pipeline_func(**pipeline_func_args) finally: - comp._components._created_task_transformation_handler.pop() + _components._created_task_transformation_handler.pop() # Getting graph outputs From 1e59cfc8922092978f6e6715c3a5cd166cae75c0 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 2 Oct 2019 14:13:05 -0700 Subject: [PATCH 10/10] Updated the copyright year. --- sdk/python/kfp/components/_python_to_graph_component.py | 2 +- .../tests/components/test_python_pipeline_to_graph_component.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 711bef2ba23..6562ede17c0 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google LLC +# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py index 57227a3bab5..3c2e266e1ab 100644 --- a/sdk/python/tests/components/test_python_pipeline_to_graph_component.py +++ b/sdk/python/tests/components/test_python_pipeline_to_graph_component.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google LLC +# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.