Skip to content

Commit

Permalink
fix(sdk): Fix nested recursion runafter and param mapping; (kubeflow#609
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Tomcli authored Jun 4, 2021
1 parent 052726a commit af9f03d
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 54 deletions.
20 changes: 19 additions & 1 deletion sdk/python/kfp_tekton/compiler/_tekton_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json, copy
import json, copy, re


def _handle_tekton_pipeline_variables(pipeline_run):
Expand Down Expand Up @@ -110,6 +110,10 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
param['type'] = 'string'
run_after_task_list = []
for run_after_task in task.get('runAfter', []):
for recursive_task in recursive_tasks:
if recursive_task['name'] in run_after_task and '-'.join(group_names[:-1]) not in run_after_task:
run_after_task = '-'.join(group_names[:-1] + [run_after_task])
break
if run_after_task not in denpendency_list:
run_after_task_list.append(run_after_task)
if task.get('runAfter', []):
Expand All @@ -136,6 +140,7 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
}
}
}

# handle loop special case
if custom_task[custom_task_key]['kind'] == 'loops':
# if subvar exist, this is dict loop parameters
Expand All @@ -162,6 +167,19 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']:
custom_task_cr = json.loads(
json.dumps(custom_task_cr).replace(custom_task_param['value'], '$(params.%s)' % custom_task_param['name']))

# need to process task parameters to replace out of scope results
# because nested graph cannot refer to task results outside of the sub-pipeline.
custom_task_cr_task_names = [custom_task_cr_task['name'] for custom_task_cr_task in custom_task_cr_tasks]
for task in custom_task_cr_tasks:
for task_param in task.get('params', []):
if '$(tasks.' in task_param['value']:
param_results = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)', task_param['value'])
for param_result in param_results:
if param_result[0] not in custom_task_cr_task_names:
task['params'] = json.loads(
json.dumps(task['params']).replace(task_param['value'],
'$(params.%s-%s)' % param_result))
custom_task_crs.append(custom_task_cr)
custom_task[custom_task_key]['spec']['params'] = sorted(custom_task[custom_task_key]['spec']['params'],
key=lambda k: k['name'])
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ def test_withitem_nested_workflow(self):
from .testdata.withitem_nested import pipeline
self._test_pipeline_workflow(pipeline, 'withitem_nested.yaml')

def test_nested_recur_runafter_workflow(self):
"""
Test compiling a nested recursion pipeline with graph dependencies.
"""
from .testdata.nested_recur_runafter import flipcoin
self._test_pipeline_workflow(flipcoin, 'nested_recur_runafter.yaml')

def test_withitem_multi_nested_workflow(self):
"""
Test compiling a withitem multi nested in workflow.
Expand Down
76 changes: 76 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_recur_runafter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp_tekton.compiler import TektonCompiler


class Coder:
def empty(self):
return ""


TektonCompiler._get_unique_id_code = Coder.empty


def flip_coin_op():
"""Flip a coin and output heads or tails randomly."""
return dsl.ContainerOp(
name='Flip coin',
image='python:alpine3.6',
command=['sh', '-c'],
arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
'else \'tails\'; print(result)" | tee /tmp/output'],
file_outputs={'output': '/tmp/output'}
)


def print_op(msg):
"""Print a message."""
return dsl.ContainerOp(
name='Print',
image='alpine:3.6',
command=['echo', msg],
)


@dsl.pipeline(
name='nested recursion pipeline',
description='shows how to use graph_component and nested recursion.'
)
def flipcoin(maxVal=12):
@dsl._component.graph_component
def flip_component(flip_result, maxVal):
@dsl._component.graph_component
def flip_component_b(flip_result_b, maxVal_b):
with dsl.Condition(flip_result_b == 'heads'):
print_flip_b = print_op(flip_result_b)
flipB = flip_coin_op().after(print_flip_b)
flip_component_b(flipB.output, maxVal_b)
recur_b = flip_component_b(flip_result, maxVal)
print_flip = print_op(flip_result)
with dsl.Condition(flip_result == 'tails'):
flipA = flip_coin_op().after(print_flip, recur_b) # note this part!
flip_component(flipA.output, maxVal)

flip_out = flip_coin_op()
flip_loop = flip_component(flip_out.output, maxVal)
print_op('cool, it is over. %s' % flip_out.output).after(flip_loop)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler as Compiler
# For Argo, uncomment the line below
# from kfp.compiler import Compiler
Compiler().compile(flipcoin, __file__.replace('.py', '.yaml'))
107 changes: 107 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_recur_runafter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use graph_component
and nested recursion.", "inputs": [{"default": "12", "name": "maxVal", "optional":
true}], "name": "nested recursion pipeline"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"flip-coin": [["output", "$(results.output.path)"]],
"flip-coin-2": [["output", "$(results.output.path)"]], "flip-coin-3": [["output",
"$(results.output.path)"]], "print": [], "print-2": [], "print-3": []}'
tekton.dev/input_artifacts: '{"print": [{"name": "flip-coin-output", "parent_task":
"flip-coin"}], "print-2": [{"name": "flip-coin-output", "parent_task": "flip-coin"}],
"print-3": [{"name": "flip-coin-output", "parent_task": "flip-coin"}]}'
tekton.dev/output_artifacts: '{"flip-coin": [{"key": "artifacts/$PIPELINERUN/flip-coin/output.tgz",
"name": "flip-coin-output", "path": "/tmp/output"}], "flip-coin-2": [{"key":
"artifacts/$PIPELINERUN/flip-coin-2/output.tgz", "name": "flip-coin-2-output",
"path": "/tmp/output"}], "flip-coin-3": [{"key": "artifacts/$PIPELINERUN/flip-coin-3/output.tgz",
"name": "flip-coin-3-output", "path": "/tmp/output"}]}'
name: nested-recursion-pipeline
spec:
params:
- name: maxVal
value: '12'
pipelineSpec:
params:
- default: '12'
name: maxVal
tasks:
- name: flip-coin
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
results:
- description: /tmp/output
name: output
steps:
- args:
- python -c "import random; result = 'heads' if random.randint(0,1) == 0
else 'tails'; print(result)" | tee $(results.output.path)
command:
- sh
- -c
image: python:alpine3.6
name: main
timeout: 0s
- name: print-3
params:
- name: flip-coin-output
value: $(tasks.flip-coin.results.output)
runAfter:
- nested-recursion-pipeline-graph-flip-component-1
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
params:
- name: flip-coin-output
steps:
- command:
- echo
- cool, it is over. $(inputs.params.flip-coin-output)
image: alpine:3.6
name: main
timeout: 0s
- name: nested-recursion-pipeline-graph-flip-component-1
params:
- name: flip-coin-output
value: $(tasks.flip-coin.results.output)
- name: just_one_iteration
value:
- '1'
- name: maxVal
value: $(params.maxVal)
runAfter:
- flip-coin
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: nested-recursion-pipeline-graph-flip-component-1
timeout: 0s
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
metadata:
name: nested-recursion-pipeline-graph-flip-component-1
spec:
pipelineSpec:
params:
- name: flip-coin-output
type: string
- name: just_one_iteration
type: string
- name: maxVal
type: string
tasks:
- name: print-2
params:
- name: flip-coin-output
value: $(params.flip-coin-output)
taskSpec:
steps:
- name: main
command:
- echo
- $(inputs.params.flip-coin-output)
image: alpine:3.6
params:
- name: flip-coin-output
type: string
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
tekton.dev/template: ''
timeout: 0s
- name: flip-coin-3
taskSpec:
steps:
- name: main
args:
- python -c "import random; result = 'heads' if random.randint(0,1) == 0
else 'tails'; print(result)" | tee $(results.output.path)
command:
- sh
- -c
image: python:alpine3.6
results:
- name: output
description: /tmp/output
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
tekton.dev/template: ''
when:
- input: $(tasks.condition-4.results.outcome)
operator: in
values:
- "true"
runAfter:
- print-2
- nested-recursion-pipeline-graph-flip-component-b-2
timeout: 0s
- name: condition-4
params:
- name: operand1
value: $(params.flip-coin-output)
- name: operand2
value: tails
- name: operator
value: ==
taskSpec:
results:
- name: outcome
description: Conditional task outcome
params:
- name: operand1
type: string
- name: operand2
type: string
- name: operator
type: string
steps:
- script: |-
python -c 'import sys
input1=str.rstrip(sys.argv[1])
input2=str.rstrip(sys.argv[2])
try:
input1=int(input1)
input2=int(input2)
except:
input1=str(input1)
outcome="true" if (input1 $(inputs.params.operator) input2) else "false"
f = open("/tekton/results/outcome", "w")
f.write(outcome)
f.close()' '$(inputs.params.operand1)' '$(inputs.params.operand2)'
image: python:alpine3.6
runAfter:
- nested-recursion-pipeline-graph-flip-component-b-2
- name: flip-component
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: nested-recursion-pipeline-graph-flip-component-1
params:
- name: just_one_iteration
value:
- '1'
- name: flip-coin-output
value: $(tasks.flip-coin-3.results.output)
- name: maxVal
value: $(params.maxVal)
when:
- input: $(tasks.condition-4.results.outcome)
operator: in
values:
- "true"
- name: nested-recursion-pipeline-graph-flip-component-b-2
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: nested-recursion-pipeline-graph-flip-component-b-2
params:
- name: flip-coin-output
value: $(params.flip-coin-output)
- name: just_one_iteration
value:
- '1'
- name: maxVal
value: $(params.maxVal)
iterateParam: just_one_iteration
Loading

0 comments on commit af9f03d

Please sign in to comment.