Skip to content

Commit

Permalink
Add PipelineConf method to set ttlSecondsAfterFinished in argo workfl…
Browse files Browse the repository at this point in the history
…ow spec
  • Loading branch information
eterna2 committed Jul 5, 2019
1 parent b957a98 commit a6ffc32
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 0 deletions.
4 changes: 4 additions & 0 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,10 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None):
'serviceAccountName': 'pipeline-runner'
}
}
# set ttl after workflow finishes
if pipeline.conf.ttl_seconds_after_finished >= 0:
workflow['spec']['ttlSecondsAfterFinished'] = pipeline.conf.ttl_seconds_after_finished

if len(pipeline.conf.image_pull_secrets) > 0:
image_pull_secrets = []
for image_pull_secret in pipeline.conf.image_pull_secrets:
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class PipelineConf():
def __init__(self):
self.image_pull_secrets = []
self.timeout = 0
self.ttl_seconds_after_finished = None
self.artifact_location = None
self.op_transformers = []

Expand All @@ -80,6 +81,15 @@ def set_timeout(self, seconds: int):
self.timeout = seconds
return self

def set_ttl_seconds_after_finished(self, seconds: int):
"""Configures the ttl after the pipeline has finished.
Args:
seconds: number of seconds for the workflow to be garbage collected after it is finished.
"""
self.ttl_seconds_after_finished = seconds
return self

def set_artifact_location(self, artifact_location):
"""Configures the pipeline level artifact location.
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ def test_py_recursive_while(self):
"""Test pipeline recursive."""
self._test_py_compile_yaml('recursive_while')

def test_py_compile_ttl_after_finished(self):
"""Test ttl after finished for pipeline."""
self._test_py_compile_yaml('ttl_after_finished')

def test_py_resourceop_basic(self):
"""Test pipeline resourceop_basic."""
self._test_py_compile_yaml('resourceop_basic')
Expand Down
50 changes: 50 additions & 0 deletions sdk/python/tests/compiler/testdata/ttl_after_finished.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp.dsl as dsl
from kubernetes import client as k8s_client


class GetFrequentWordOp(dsl.ContainerOp):
"""A get frequent word class representing a component in ML Pipelines.
The class provides a nice interface to users by hiding details such as container,
command, arguments.
"""
def __init__(self, name, message):
"""Args:
name: An identifier of the step which needs to be unique within a pipeline.
message: a dsl.PipelineParam object representing an input message.
"""
super(GetFrequentWordOp, self).__init__(
name=name,
image='python:3.5-jessie',
command=['sh', '-c'],
arguments=['python -c "from collections import Counter; '
'words = Counter(\'%s\'.split()); print(max(words, key=words.get))" '
'| tee /tmp/message.txt' % message],
file_outputs={'word': '/tmp/message.txt'})

@dsl.pipeline(
name='GC after finished',
description='Get Most Frequent Word and garbage collect the resources after finishing'
)
def save_most_frequent_word(message: str):
"""A pipeline function describing the orchestration of the workflow."""

counter = GetFrequentWordOp(
name='get-Frequent',
message=message)
dsl.get_pipeline_conf().set_ttl_seconds_after_finished(86400) # gc after 1 day
48 changes: 48 additions & 0 deletions sdk/python/tests/compiler/testdata/ttl_after_finished.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: gc-after-finished-
spec:
arguments:
parameters:
- name: message
entrypoint: save-most-frequent
ttlSecondsAfterFinished: 86400
serviceAccountName: pipeline-runner
templates:
- container:
args:
- python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split());
print(max(words, key=words.get))" | tee /tmp/message.txt
command:
- sh
- -c
image: python:3.5-jessie
inputs:
parameters:
- name: message
name: get-frequent
outputs:
artifacts:
- name: mlpipeline-ui-metadata
path: /mlpipeline-ui-metadata.json
optional: true
- name: mlpipeline-metrics
path: /mlpipeline-metrics.json
optional: true
parameters:
- name: get-frequent-word
valueFrom:
path: /tmp/message.txt
- dag:
tasks:
- arguments:
parameters:
- name: message
value: '{{inputs.parameters.message}}'
name: get-frequent
template: get-frequent
inputs:
parameters:
- name: message
name: save-most-frequent

0 comments on commit a6ffc32

Please sign in to comment.