Skip to content

Commit

Permalink
consolidate the value of results - address comments from ckadner (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenglixa authored Jun 11, 2020
1 parent 81f578a commit d592ebf
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 117 deletions.
72 changes: 28 additions & 44 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import re
from typing import Optional, Set
from . import _op_to_template
from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name


def fix_big_data_passing(
Expand Down Expand Up @@ -97,11 +98,6 @@ def fix_big_data_passing(
template for template in workflow if template['kind'] == 'PipelineRun'
]

# we need to make the format in tekton unified for params, artifacts, outputs.
container_templates = unify_container_params(container_templates)
pipeline_templates = unify_pipeline_params(pipeline_templates)
pipelinerun_templates = unify_pipelinerun_params(pipelinerun_templates)

# 1. Index the pipelines to understand how data is being passed and which
# inputs/outputs are connected to each other.
template_input_to_parent_pipeline_inputs = {
Expand Down Expand Up @@ -397,6 +393,20 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
if (template.get('metadata', {}).get('name'),
output_parameter['name']) in outputs_consumed_as_parameters
]
# tekton results doesn't support underscore
renamed_results_in_pipeline_task = set()
for task_result in spec['results']:
task_result_old_name = task_result.get('name')
task_result_new_name = sanitize_k8s_name(task_result_old_name)
if task_result_new_name != task_result_old_name:
task_result['name'] = task_result_new_name
renamed_results_in_pipeline_task.add(
(task_result_old_name, task_result_new_name))
for renamed_result in renamed_results_in_pipeline_task:
# Change results.downloaded_resultOutput to results.downloaded-resultoutput
template['spec'] = replace_big_data_placeholder(
spec, 'results.%s' % renamed_result[0],
'results.%s' % renamed_result[1])

# Remove pipeline task parameters unless they're used downstream
for template in pipeline_templates:
Expand All @@ -412,6 +422,18 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
or task['taskRef']['name'] in resource_template_names
]

# tekton results doesn't support underscore
for argument in task['params']:
argument_value = argument.get('value')
argument_placeholder_parts = deconstruct_tekton_single_placeholder(
argument_value)
if len(argument_placeholder_parts) == 4 \
and argument_placeholder_parts[0] == 'tasks':
argument['value'] = '$(tasks.%s.%s.%s)' % (
argument_placeholder_parts[1],
argument_placeholder_parts[2],
sanitize_k8s_name(argument_placeholder_parts[3]))

# Need to confirm:
# I didn't find the use cases to support workflow parameter consumed as artifacts downstream in tekton.
# Whether this case need to be supporting?
Expand All @@ -420,44 +442,6 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
return workflow


# Replaced '_' to '-' to make the format in tekton unified for params, artifacts, outputs.
def unify_container_params(templates: list):
for template in templates:
template_params = template.get('spec', {}).get('params', [])
for template_param in template_params:
template_param['name'] = template_param.get('name').replace(
'_', '-')
template_artifacts = template.get('spec', {}).get('artifacts', [])
for template_artifact in template_artifacts:
template_artifact['raw']['data'] = template_artifact.get(
'raw', {}).get('data').replace('_', '-')
return templates


def unify_pipeline_params(templates: list):
for template in templates:
tasks = template.get('spec', {}).get('tasks', [])
pipeline_params = template.get('spec', {}).get('params', [])
for pipeline_param in pipeline_params:
pipeline_param['name'] = pipeline_param.get('name').replace(
'_', '-')
for task in tasks:
task_params = task.get('params')
for task_param in task_params:
task_param['name'] = task_param.get('name').replace('_', '-')
task_param['value'] = task_param.get('value').replace('_', '-')
return templates


def unify_pipelinerun_params(templates: list):
for template in templates:
pipelinerun_params = template.get('spec', {}).get('params', [])
for pipelinerun_param in pipelinerun_params:
pipelinerun_param['name'] = pipelinerun_param.get('name').replace(
'_', '-')
return templates


def extract_all_tekton_placeholders(template: dict) -> Set[str]:
template_str = json.dumps(template)
placeholders = set(re.findall('\\$\\(([-._a-zA-Z0-9]+)\\)', template_str))
Expand All @@ -474,7 +458,7 @@ def extract_tekton_input_parameter_name(s: str) -> Optional[str]:

def deconstruct_tekton_single_placeholder(s: str) -> List[str]:
if not re.fullmatch('\\$\\([-._a-zA-Z0-9]+\\)', s):
return None
return []
return s.lstrip('$(').rstrip(')').split('.')


Expand Down
3 changes: 1 addition & 2 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def _process_parameters(processed_op: BaseOp,
template['spec']['results'] = []
copy_results_step = _get_base_step('copy-results')
for name, path in processed_op.file_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
'name': name,
'description': path
Expand Down Expand Up @@ -359,7 +358,7 @@ def _process_base_ops(op: BaseOp):

# map param's (unsanitized pattern or serialized str pattern) -> input param var str
map_to_tmpl_var = {
(param.pattern or str(param)): '$(inputs.params.%s)' % param.full_name.replace('_', '-') # Tekton change
(param.pattern or str(param)): '$(inputs.params.%s)' % param.full_name # Tekton change
for param in op.inputs
}

Expand Down
3 changes: 1 addition & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
else:
for pp in op.inputs:
if tp['name'] == pp.full_name:
# replace '_' to '-' since tekton results doesn't support underscore
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name.replace('_', '-'))
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name)
# Create input artifact tracking annotation
if self.enable_artifacts:
input_annotation = self.input_artifacts.get(task['name'], [])
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/tests/compiler/testdata/artifact_location.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
params:
- name: bucket
- name: namespace
- name: secret-name
- name: secret_name
- name: tag
results:
- description: /tmp/output.txt
Expand Down Expand Up @@ -49,12 +49,12 @@ spec:
valueFrom:
secretKeyRef:
key: accesskey
name: $(inputs.params.secret-name)
name: $(inputs.params.secret_name)
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
key: secretkey
name: $(inputs.params.secret-name)
name: $(inputs.params.secret_name)
image: minio/mc
name: copy-artifacts
script: '#!/usr/bin/env sh
Expand Down Expand Up @@ -85,7 +85,7 @@ metadata:
spec:
params:
- default: mlpipeline-minio-artifact
name: secret-name
name: secret_name
- default: 1.31.0
name: tag
- default: kubeflow
Expand All @@ -99,8 +99,8 @@ spec:
value: $(params.bucket)
- name: namespace
value: $(params.namespace)
- name: secret-name
value: $(params.secret-name)
- name: secret_name
value: $(params.secret_name)
- name: tag
value: $(params.tag)
taskRef:
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/tests/compiler/testdata/big_data_passing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
- --count
- '5000'
- --output-text
- $(workspaces.repeat-line.path)/repeat-line-output-text
- $(workspaces.repeat-line.path)/repeat-line-output_text
command:
- python3
- -u
Expand Down Expand Up @@ -66,7 +66,7 @@ spec:
steps:
- args:
- --text
- $(workspaces.print-text.path)/repeat-line-output-text
- $(workspaces.print-text.path)/repeat-line-output_text
command:
- python3
- -u
Expand Down Expand Up @@ -131,9 +131,9 @@ spec:
- --source
- /tmp/inputs/source/data
- --odd-lines
- $(workspaces.split-text-lines.path)/split-text-lines-odd-lines
- $(workspaces.split-text-lines.path)/split-text-lines-odd_lines
- --even-lines
- $(workspaces.split-text-lines.path)/split-text-lines-even-lines
- $(workspaces.split-text-lines.path)/split-text-lines-even_lines
command:
- python3
- -u
Expand Down Expand Up @@ -178,7 +178,7 @@ spec:
steps:
- args:
- --text
- $(workspaces.print-text-2.path)/split-text-lines-odd-lines
- $(workspaces.print-text-2.path)/split-text-lines-odd_lines
command:
- python3
- -u
Expand Down Expand Up @@ -209,7 +209,7 @@ spec:
steps:
- args:
- --text
- $(workspaces.print-text-3.path)/split-text-lines-even-lines
- $(workspaces.print-text-3.path)/split-text-lines-even_lines
command:
- python3
- -u
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/tests/compiler/testdata/katib.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
- name: parallelTrialCount
results:
- description: /output.txt
name: bestHyperParameter
name: besthyperparameter
steps:
- args:
- --name
Expand Down Expand Up @@ -61,7 +61,7 @@ spec:
''parameterType'': ''categorical'', ''feasibleSpace'': {''list'': [''sgd'',
''adam'', ''ftrl'']}}]'
- --outputFile
- $(results.bestHyperParameter.path)
- $(results.besthyperparameter.path)
- --deleteAfterDone
- $(inputs.params.deleteAfterDone)
- --experimentTimeoutMinutes
Expand Down Expand Up @@ -137,6 +137,6 @@ spec:
- name: my-out-cop
params:
- name: mnist-hpo-bestHyperParameter
value: $(tasks.mnist-hpo.results.bestHyperParameter)
value: $(tasks.mnist-hpo.results.besthyperparameter)
taskRef:
name: my-out-cop
36 changes: 18 additions & 18 deletions sdk/python/tests/compiler/testdata/loop_static.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ metadata:
name: my-in-coop1
spec:
params:
- name: loop-item-param-subvar-A-a
- name: my-pipe-param
- name: loop-item-param-subvar-A_a
- name: my_pipe_param
steps:
- args:
- echo op1 $(inputs.params.loop-item-param-subvar-A-a) $(inputs.params.my-pipe-param)
- echo op1 $(inputs.params.loop-item-param-subvar-A_a) $(inputs.params.my_pipe_param)
command:
- sh
- -c
Expand All @@ -35,10 +35,10 @@ metadata:
name: my-in-coop2
spec:
params:
- name: loop-item-param-subvar-B-b
- name: loop-item-param-subvar-B_b
steps:
- args:
- echo op2 $(inputs.params.loop-item-param-subvar-B-b)
- echo op2 $(inputs.params.loop-item-param-subvar-B_b)
command:
- sh
- -c
Expand All @@ -51,10 +51,10 @@ metadata:
name: my-out-cop
spec:
params:
- name: my-pipe-param
- name: my_pipe_param
steps:
- args:
- echo $(inputs.params.my-pipe-param)
- echo $(inputs.params.my_pipe_param)
command:
- sh
- -c
Expand All @@ -72,39 +72,39 @@ metadata:
spec:
params:
- default: '10'
name: my-pipe-param
name: my_pipe_param
tasks:
- name: my-in-coop1-loop-item-0
params:
- name: loop-item-param-subvar-A-a
- name: loop-item-param-subvar-A_a
value: '1'
- name: my-pipe-param
value: $(params.my-pipe-param)
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
name: my-in-coop1
- name: my-in-coop1-loop-item-1
params:
- name: loop-item-param-subvar-A-a
- name: loop-item-param-subvar-A_a
value: '10'
- name: my-pipe-param
value: $(params.my-pipe-param)
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
name: my-in-coop1
- name: my-in-coop2-loop-item-0
params:
- name: loop-item-param-subvar-B-b
- name: loop-item-param-subvar-B_b
value: '2'
taskRef:
name: my-in-coop2
- name: my-in-coop2-loop-item-1
params:
- name: loop-item-param-subvar-B-b
- name: loop-item-param-subvar-B_b
value: '20'
taskRef:
name: my-in-coop2
- name: my-out-cop-loop-item-0
params:
- name: my-pipe-param
value: $(params.my-pipe-param)
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
name: my-out-cop
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/pipelineparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def pipelineparams_pipeline(tag: str = 'latest', sleep_ms: int = 10):
command=['sh', '-c'],
arguments=['sleep %s; wget localhost:5678 -O /tmp/results.txt' % sleep_ms],
sidecars=[echo],
file_outputs={'downloaded': '/tmp/results.txt'})
file_outputs={'downloaded_resultOutput': '/tmp/results.txt'})

op2 = dsl.ContainerOp(
name='echo',
Expand Down
Loading

0 comments on commit d592ebf

Please sign in to comment.