Skip to content

Commit

Permalink
Fixed detection of parameter output references
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark-kun committed Sep 20, 2019
1 parent f8fca37 commit 0595ac5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
19 changes: 18 additions & 1 deletion sdk/python/kfp/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ def fix_big_data_passing(workflow: dict) -> dict:
elif placeholder_type == 'item' or placeholder_type == 'workflow' or placeholder_type == 'pod':
raise RuntimeError('DAG output value "{}" is not supported.'.format(output_value))
else:
raise AssertionError
raise AssertionError('Unexpected placeholder type "{}".'.format(placeholder_type))
# Finshed indexing the DAGs

# 2. Search for direct data consumers in container/resource templates and some DAG task attributes (e.g. conditions and loops) to find out which inputs are directly consumed as parameters/artifacts.
inputs_directly_consumed_as_parameters = set()
inputs_directly_consumed_as_artifacts = set()
outputs_directly_consumed_as_parameters = set()

# Searching for artifact input consumers in container template inputs
for template in container_templates:
Expand All @@ -123,6 +124,7 @@ def fix_big_data_passing(workflow: dict) -> dict:
# Searching for parameter input consumers in DAG templates (.when, .withParam, etc)
for template in dag_templates:
template_name = template['name']
task_name_to_template_name = {task['name']: task['template'] for task in dag_tasks}
for task in template['dag']['tasks']:
# We do not care about the inputs mentioned in task arguments since we will be free to switch them from parameters to artifacts
# TODO: Handle cases where argument value is a string containing placeholders (not just consisting of a single placeholder) or the input name contains placeholder
Expand All @@ -138,6 +140,19 @@ def fix_big_data_passing(workflow: dict) -> dict:
inputs_directly_consumed_as_parameters.add((template_name, input_name))
else:
raise AssertionError
elif placeholder_type == 'tasks':
upstream_task_name = argument_placeholder_parts[1]
assert argument_placeholder_parts[2] == 'outputs'
assert argument_placeholder_parts[3] == 'parameters'
upstream_output_name = argument_placeholder_parts[4]
upstream_template_name = task_name_to_template_name[upstream_task_name]
outputs_directly_consumed_as_parameters.add((upstream_template_name, upstream_output_name))
elif placeholder_type == 'workflow' or placeholder_type == 'pod':
pass
elif placeholder_type == 'item':
raise AssertionError('The "{{item}}" placeholder is not expected outside task arguments.')
else:
raise AssertionError('Unexpected placeholder type "{}".'.format(placeholder_type))

# Searching for parameter input consumers in container and resource templates
for template in container_templates + resource_templates:
Expand Down Expand Up @@ -198,6 +213,8 @@ def mark_upstream_ios_of_output(template_output, marked_inputs, marked_outputs):
mark_upstream_ios_of_input(input, inputs_consumed_as_parameters, outputs_consumed_as_parameters)
for input in inputs_directly_consumed_as_artifacts:
mark_upstream_ios_of_input(input, inputs_consumed_as_artifacts, outputs_consumed_as_artifacts)
for output in outputs_directly_consumed_as_parameters:
mark_upstream_ios_of_output(output, inputs_consumed_as_parameters, outputs_consumed_as_parameters)


# 4. Convert the inputs, outputs and arguments based on how they're consumed downstream.
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/tests/compiler/testdata/coin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ spec:
artifacts:
- name: flip-output
path: /tmp/output
parameters:
- name: flip-output
valueFrom:
path: /tmp/output
- container:
args:
- python -c "import random; result = 'heads' if random.randint(0,1) == 0 else
Expand Down

0 comments on commit 0595ac5

Please sign in to comment.