Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Components - Enable loading graph components #2010

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions sdk/python/kfp/components/_component_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pathlib import Path
import requests
from typing import Callable
from . import _components as comp
from ._structures import ComponentReference

Expand Down Expand Up @@ -91,3 +92,24 @@ def load_component(self, name, digest=None, tag=None):
return comp._load_component_from_yaml_or_zip_bytes(response.content, url, component_ref)

raise RuntimeError('Component {} was not found. Tried the following locations:\n{}'.format(name, '\n'.join(tried_locations)))

def _load_component_from_ref(self, component_ref: ComponentReference) -> Callable:
if component_ref.spec:
return comp._create_task_factory_from_component_spec(component_spec=component_ref.spec, component_ref=component_ref)
if component_ref.url:
return self.load_component_from_url(component_ref.url)
return self.load_component(
name=component_ref.name,
digest=component_ref.digest,
tag=component_ref.tag,
)


ComponentStore.default_store = ComponentStore(
local_search_paths=[
'.',
],
url_search_prefixes=[
'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/'
],
)
47 changes: 47 additions & 0 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ def create_task_from_component_and_arguments(pythonic_arguments):
arguments=arguments,
)
task._init_outputs()

if isinstance(component_spec.implementation, GraphImplementation):
return _resolve_graph_task(task, component_spec)

if _created_task_transformation_handler:
task = _created_task_transformation_handler[-1](task)
Expand Down Expand Up @@ -276,3 +279,47 @@ def component_default_to_func_default(component_default: str, is_optional: bool)
)
task_factory.component_spec = component_spec
return task_factory


def _resolve_graph_task(graph_task: TaskSpec, graph_component_spec: ComponentSpec) -> TaskSpec:
from ..components import ComponentStore
component_store = ComponentStore.default_store

graph = graph_component_spec.implementation.graph

outputs_of_tasks = {}
def resolve_argument(argument):
if isinstance(argument, (str, int, float, bool)):
return argument
elif isinstance(argument, GraphInputArgument):
return graph_task.arguments[argument.input_name]
elif isinstance(argument, TaskOutputArgument):
upstream_task_output_ref = argument.task_output
upstream_task_outputs = outputs_of_tasks[upstream_task_output_ref.task_id]
upstream_task_output = upstream_task_outputs[upstream_task_output_ref.output_name]
return upstream_task_output
else:
raise TypeError('Argument for input has unexpected type "{}".'.format(type(argument)))

for task_id, task_spec in graph._toposorted_tasks.items(): # Cannot use graph.tasks here since they might be listed not in dependency order. Especially on python <3.6 where the dicts do not preserve ordering
task_factory = component_store._load_component_from_ref(task_spec.component_ref)
task_arguments = {input_name: resolve_argument(argument) for input_name, argument in task_spec.arguments.items()}
task_component_spec = task_spec.component_ref.spec

input_name_to_pythonic = generate_unique_name_conversion_table([input.name for input in task_component_spec.inputs], _sanitize_python_function_name)
output_name_to_pythonic = generate_unique_name_conversion_table([output.name for output in task_component_spec.outputs], _sanitize_python_function_name)
pythonic_output_name_to_original = {pythonic_name: original_name for original_name, pythonic_name in output_name_to_pythonic.items()}
pythonic_task_arguments = {input_name_to_pythonic[input_name]: argument for input_name, argument in task_arguments.items()}

task_obj = task_factory(**pythonic_task_arguments)
task_outputs_with_pythonic_names = task_obj.outputs
task_outputs_with_original_names = {pythonic_output_name_to_original[pythonic_output_name]: output_value for pythonic_output_name, output_value in task_outputs_with_pythonic_names.items()}
outputs_of_tasks[task_id] = task_outputs_with_original_names

resolved_graph_outputs = OrderedDict([(output_name, resolve_argument(argument)) for output_name, argument in graph.output_values.items()])

# For resolved graph component tasks task.outputs point to the actual tasks that originally produced the output that is later returned from the graph
graph_task.output_references = graph_task.outputs
graph_task.outputs = resolved_graph_outputs

return graph_task
159 changes: 159 additions & 0 deletions sdk/python/tests/components/test_graph_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,165 @@ def test_handle_parsing_task_volumes_and_mounts(self):
component_spec = ComponentSpec.from_dict(struct)
self.assertEqual(component_spec.implementation.graph.tasks['task 1'].k8s_pod_options.spec.volumes[0].name, 'workdir')
self.assertTrue(component_spec.implementation.graph.tasks['task 1'].k8s_pod_options.spec.volumes[0].empty_dir is not None)

def test_load_graph_component(self):
component_text = '''\
inputs:
- {name: graph in 1}
- {name: graph in 2}
outputs:
- {name: graph out 1}
- {name: graph out 2}
- {name: graph out 3}
- {name: graph out 4}
implementation:
graph:
tasks:
task 1:
componentRef:
spec:
name: Component 1
inputs:
- {name: in1_1}
outputs:
- {name: out1_1}
- {name: out1_2}
implementation:
container:
image: busybox
command: [sh, -c, 'echo "$0" > $1; echo "$0" > $2', {inputValue: in1_1}, {outputPath: out1_1}, {outputPath: out1_2}]
arguments:
in1_1: 11
task 2:
componentRef:
spec:
name: Component 2
inputs:
- {name: in2_1}
- {name: in2_2}
outputs:
- {name: out2_1}
implementation:
container:
image: busybox
command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in2_1}, {inputValue: in2_2}, {outputPath: out2_1}]
arguments:
in2_1: 21
in2_2: {taskOutput: {taskId: task 1, outputName: out1_1}}
task 3:
componentRef:
spec:
name: Component 3
inputs:
- {name: in3_1}
- {name: in3_2}
outputs:
- {name: out3_1}
implementation:
container:
image: busybox
command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in3_1}, {inputValue: in3_2}, {outputPath: out3_1}]
arguments:
in3_1: {taskOutput: {taskId: task 2, outputName: out2_1}}
in3_2: {graphInput: graph in 1}
outputValues:
graph out 1: {taskOutput: {taskId: task 3, outputName: out3_1}}
graph out 2: {taskOutput: {taskId: task 1, outputName: out1_2}}
graph out 3: {graphInput: graph in 2}
graph out 4: 42
'''
op = comp.load_component_from_text(component_text)
task = op('graph 1', 'graph 2')
self.assertEqual(len(task.outputs), 4)

def test_load_nested_graph_components(self):
component_text = '''\
inputs:
- {name: graph in 1}
- {name: graph in 2}
outputs:
- {name: graph out 1}
- {name: graph out 2}
- {name: graph out 3}
- {name: graph out 4}
implementation:
graph:
tasks:
task 1:
componentRef:
spec:
name: Component 1
inputs:
- {name: in1_1}
outputs:
- {name: out1_1}
- {name: out1_2}
implementation:
container:
image: busybox
command: [sh, -c, 'echo "$0" > $1; echo "$0" > $2', {inputValue: in1_1}, {outputPath: out1_1}, {outputPath: out1_2}]
arguments:
in1_1: 11
task 2:
componentRef:
spec:
name: Component 2
inputs:
- {name: in2_1}
- {name: in2_2}
outputs:
- {name: out2_1}
implementation:
container:
image: busybox
command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in2_1}, {inputValue: in2_2}, {outputPath: out2_1}]
arguments:
in2_1: 21
in2_2: {taskOutput: {taskId: task 1, outputName: out1_1}}
task 3:
componentRef:
spec:
inputs:
- {name: in3_1}
- {name: in3_2}
outputs:
- {name: out3_1}
implementation:
graph:
tasks:
graph subtask:
componentRef:
spec:
name: Component 3
inputs:
- {name: in3_1}
- {name: in3_2}
outputs:
- {name: out3_1}
implementation:
container:
image: busybox
command: [sh, -c, 'cat "$0" "$1" > $2', {inputValue: in3_1}, {inputValue: in3_2}, {outputPath: out3_1}]
arguments:
in3_1: {graphInput: in3_1}
in3_2: {graphInput: in3_1}
outputValues:
out3_1: {taskOutput: {taskId: graph subtask, outputName: out3_1}}
arguments:
in3_1: {taskOutput: {taskId: task 2, outputName: out2_1}}
in3_2: {graphInput: graph in 1}
outputValues:
graph out 1: {taskOutput: {taskId: task 3, outputName: out3_1}}
graph out 2: {taskOutput: {taskId: task 1, outputName: out1_2}}
graph out 3: {graphInput: graph in 2}
graph out 4: 42
'''
op = comp.load_component_from_text(component_text)
task = op('graph 1', 'graph 2')
self.assertIn('out3_1', str(task.outputs['graph out 1'])) # Checks that the outputs coming from tasks in nested subgraphs are properly resolved.
self.assertIn('out1_2', str(task.outputs['graph out 2']))
self.assertEqual(task.outputs['graph out 3'], 'graph 2')
self.assertEqual(task.outputs['graph out 4'], 42)

#TODO: Test task name conversion to Argo-compatible names

Expand Down