From 446826e21775b1ac854b4ac828ef2629d34d5b21 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 29 Aug 2019 10:25:12 -0700 Subject: [PATCH] SDK - Components - Enable loading graph components The graph components are now correctly loaded and instantiated. Also added pre-configured ComponentStore.default_store --- sdk/python/kfp/components/_component_store.py | 22 +++ sdk/python/kfp/components/_components.py | 47 ++++++ .../tests/components/test_graph_components.py | 159 ++++++++++++++++++ 3 files changed, 228 insertions(+) diff --git a/sdk/python/kfp/components/_component_store.py b/sdk/python/kfp/components/_component_store.py index 1123a891a43..59cb19e7567 100644 --- a/sdk/python/kfp/components/_component_store.py +++ b/sdk/python/kfp/components/_component_store.py @@ -4,6 +4,7 @@ from pathlib import Path import requests +from typing import Callable from . import _components as comp from ._structures import ComponentReference @@ -91,3 +92,24 @@ def load_component(self, name, digest=None, tag=None): return comp._load_component_from_yaml_or_zip_bytes(response.content, url, component_ref) raise RuntimeError('Component {} was not found. Tried the following locations:\n{}'.format(name, '\n'.join(tried_locations))) + + def _load_component_from_ref(self, component_ref: ComponentReference) -> Callable: + if component_ref.spec: + return comp._create_task_factory_from_component_spec(component_spec=component_ref.spec, component_ref=component_ref) + if component_ref.url: + return self.load_component_from_url(component_ref.url) + return self.load_component( + name=component_ref.name, + digest=component_ref.digest, + tag=component_ref.tag, + ) + + +ComponentStore.default_store = ComponentStore( + local_search_paths=[ + '.', + ], + url_search_prefixes=[ + 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/' + ], +) diff --git a/sdk/python/kfp/components/_components.py b/sdk/python/kfp/components/_components.py index 7965cf1aae1..a198bc70db4 100644 --- a/sdk/python/kfp/components/_components.py +++ b/sdk/python/kfp/components/_components.py @@ -239,6 +239,9 @@ def create_task_from_component_and_arguments(pythonic_arguments): arguments=arguments, ) task._init_outputs() + + if isinstance(component_spec.implementation, GraphImplementation): + return _resolve_graph_task(task, component_spec) if _created_task_transformation_handler: task = _created_task_transformation_handler[-1](task) @@ -276,3 +279,47 @@ def component_default_to_func_default(component_default: str, is_optional: bool) ) task_factory.component_spec = component_spec return task_factory + + +def _resolve_graph_task(graph_task: TaskSpec, graph_component_spec: ComponentSpec) -> TaskSpec: + from ..components import ComponentStore + component_store = ComponentStore.default_store + + graph = graph_component_spec.implementation.graph + + outputs_of_tasks = {} + def resolve_argument(argument): + if isinstance(argument, (str, int, float, bool)): + return argument + elif isinstance(argument, GraphInputArgument): + return graph_task.arguments[argument.input_name] + elif isinstance(argument, TaskOutputArgument): + upstream_task_output_ref = argument.task_output + upstream_task_outputs = outputs_of_tasks[upstream_task_output_ref.task_id] + upstream_task_output = upstream_task_outputs[upstream_task_output_ref.output_name] + return upstream_task_output + else: + raise TypeError('Argument for input has unexpected type "{}".'.format(type(argument))) + + for task_id, task_spec in graph._toposorted_tasks.items(): # Cannot use graph.tasks here since they might be listed not in dependency order. Especially on python <3.6 where the dicts do not preserve ordering + task_factory = component_store._load_component_from_ref(task_spec.component_ref) + task_arguments = {input_name: resolve_argument(argument) for input_name, argument in task_spec.arguments.items()} + task_component_spec = task_spec.component_ref.spec + + input_name_to_pythonic = generate_unique_name_conversion_table([input.name for input in task_component_spec.inputs], _sanitize_python_function_name) + output_name_to_pythonic = generate_unique_name_conversion_table([output.name for output in task_component_spec.outputs], _sanitize_python_function_name) + pythonic_output_name_to_original = {pythonic_name: original_name for original_name, pythonic_name in output_name_to_pythonic.items()} + pythonic_task_arguments = {input_name_to_pythonic[input_name]: argument for input_name, argument in task_arguments.items()} + + task_obj = task_factory(**pythonic_task_arguments) + task_outputs_with_pythonic_names = task_obj.outputs + task_outputs_with_original_names = {pythonic_output_name_to_original[pythonic_output_name]: output_value for pythonic_output_name, output_value in task_outputs_with_pythonic_names.items()} + outputs_of_tasks[task_id] = task_outputs_with_original_names + + resolved_graph_outputs = OrderedDict([(output_name, resolve_argument(argument)) for output_name, argument in graph.output_values.items()]) + + # For resolved graph component tasks task.outputs point to the actual tasks that originally produced the output that is later returned from the graph + graph_task.output_references = graph_task.outputs + graph_task.outputs = resolved_graph_outputs + + return graph_task diff --git a/sdk/python/tests/components/test_graph_components.py b/sdk/python/tests/components/test_graph_components.py index b9d295546d2..c37a6192c18 100644 --- a/sdk/python/tests/components/test_graph_components.py +++ b/sdk/python/tests/components/test_graph_components.py @@ -168,6 +168,165 @@ def test_handle_parsing_task_volumes_and_mounts(self): component_spec = ComponentSpec.from_dict(struct) self.assertEqual(component_spec.implementation.graph.tasks['task 1'].k8s_pod_options.spec.volumes[0].name, 'workdir') self.assertTrue(component_spec.implementation.graph.tasks['task 1'].k8s_pod_options.spec.volumes[0].empty_dir is not None) + + def test_load_graph_component(self): + component_text = '''\ +inputs: +- {name: graph in 1} +- {name: graph in 2} +outputs: +- {name: graph out 1} +- {name: graph out 2} +- {name: graph out 3} +- {name: graph out 4} +implementation: + graph: + tasks: + task 1: + componentRef: + spec: + name: Component 1 + inputs: + - {name: in1_1} + outputs: + - {name: out1_1} + - {name: out1_2} + implementation: + container: + image: busybox + command: [sh, -c, 'echo "$0" > $1; echo "$0" > $2', {inputValue: in1_1}, {outputPath: out1_1}, {outputPath: out1_2}] + arguments: + in1_1: 11 + task 2: + componentRef: + spec: + name: Component 2 + inputs: + - {name: in2_1} + - {name: in2_2} + outputs: + - {name: out2_1} + implementation: + container: + image: busybox + command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in2_1}, {inputValue: in2_2}, {outputPath: out2_1}] + arguments: + in2_1: 21 + in2_2: {taskOutput: {taskId: task 1, outputName: out1_1}} + task 3: + componentRef: + spec: + name: Component 3 + inputs: + - {name: in3_1} + - {name: in3_2} + outputs: + - {name: out3_1} + implementation: + container: + image: busybox + command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in3_1}, {inputValue: in3_2}, {outputPath: out3_1}] + arguments: + in3_1: {taskOutput: {taskId: task 2, outputName: out2_1}} + in3_2: {graphInput: graph in 1} + outputValues: + graph out 1: {taskOutput: {taskId: task 3, outputName: out3_1}} + graph out 2: {taskOutput: {taskId: task 1, outputName: out1_2}} + graph out 3: {graphInput: graph in 2} + graph out 4: 42 +''' + op = comp.load_component_from_text(component_text) + task = op('graph 1', 'graph 2') + self.assertEqual(len(task.outputs), 4) + + def test_load_nested_graph_components(self): + component_text = '''\ +inputs: +- {name: graph in 1} +- {name: graph in 2} +outputs: +- {name: graph out 1} +- {name: graph out 2} +- {name: graph out 3} +- {name: graph out 4} +implementation: + graph: + tasks: + task 1: + componentRef: + spec: + name: Component 1 + inputs: + - {name: in1_1} + outputs: + - {name: out1_1} + - {name: out1_2} + implementation: + container: + image: busybox + command: [sh, -c, 'echo "$0" > $1; echo "$0" > $2', {inputValue: in1_1}, {outputPath: out1_1}, {outputPath: out1_2}] + arguments: + in1_1: 11 + task 2: + componentRef: + spec: + name: Component 2 + inputs: + - {name: in2_1} + - {name: in2_2} + outputs: + - {name: out2_1} + implementation: + container: + image: busybox + command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in2_1}, {inputValue: in2_2}, {outputPath: out2_1}] + arguments: + in2_1: 21 + in2_2: {taskOutput: {taskId: task 1, outputName: out1_1}} + task 3: + componentRef: + spec: + inputs: + - {name: in3_1} + - {name: in3_2} + outputs: + - {name: out3_1} + implementation: + graph: + tasks: + graph subtask: + componentRef: + spec: + name: Component 3 + inputs: + - {name: in3_1} + - {name: in3_2} + outputs: + - {name: out3_1} + implementation: + container: + image: busybox + command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in3_1}, {inputValue: in3_2}, {outputPath: out3_1}] + arguments: + in3_1: {graphInput: in3_1} + in3_2: {graphInput: in3_1} + outputValues: + out3_1: {taskOutput: {taskId: graph subtask, outputName: out3_1}} + arguments: + in3_1: {taskOutput: {taskId: task 2, outputName: out2_1}} + in3_2: {graphInput: graph in 1} + outputValues: + graph out 1: {taskOutput: {taskId: task 3, outputName: out3_1}} + graph out 2: {taskOutput: {taskId: task 1, outputName: out1_2}} + graph out 3: {graphInput: graph in 2} + graph out 4: 42 +''' + op = comp.load_component_from_text(component_text) + task = op('graph 1', 'graph 2') + self.assertIn('out3_1', str(task.outputs['graph out 1'])) # Checks that the outputs coming from tasks in nested subgraphs are properly resolved. + self.assertIn('out1_2', str(task.outputs['graph out 2'])) + self.assertEqual(task.outputs['graph out 3'], 'graph 2') + self.assertEqual(task.outputs['graph out 4'], 42) #TODO: Test task name conversion to Argo-compatible names