Skip to content

Commit

Permalink
feat(sdk): add runtime resource requests. Fixes #1956 (#5447)
Browse files Browse the repository at this point in the history
* added resource request at runtime

* fixed things

* Update to use read only parameter insteadt

* added test case and better example

* Updated again

* add the validation

* add to the test suit

* work in progress

* update after feedback

* fix the test

* clean up

* clean up

* fix the path

* add the test again

* clean up

* fix tests

* feedback fix

* comment out and clean up
  • Loading branch information
NikeNano authored Jun 10, 2021
1 parent e860fd6 commit 5db8431
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 17 deletions.
48 changes: 48 additions & 0 deletions samples/core/resource_spec/runtime_resource_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
from kfp import dsl, components
from typing import NamedTuple

@components.create_component_from_func
def training_op(n: int) -> int:
# quickly allocate a lot of memory to verify memory is enough
a = [i for i in range(n)]
return len(a)

@components.create_component_from_func
def generate_resouce_request() -> NamedTuple('output', [('memory', str), ('cpu', str)]):
'''Returns the memory and cpu request'''
from collections import namedtuple

resouce_output = namedtuple('output', ['memory', 'cpu'])
return resouce_output('500Mi', '200m')

@dsl.pipeline(
name='Runtime resource request pipeline',
description='An example on how to make resource requests at runtime.'
)
def resource_request_pipeline(n: int = 11234567):
resouce_task = generate_resouce_request()
traning_task = training_op(n)\
.set_memory_limit(resouce_task.outputs['memory'])\
.set_cpu_limit(resouce_task.outputs['cpu'])\
.set_cpu_request('200m')

# Disable cache for KFP v1 mode.
traning_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'

if __name__ == '__main__':
kfp.compiler.Compiler().compile(resource_request_pipeline, __file__ + '.yaml')
48 changes: 48 additions & 0 deletions samples/core/resource_spec/runtime_resource_request_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
from .runtime_resource_request import resource_request_pipeline
from ...test.util import run_pipeline_func, TestCase


def EXPECTED_OOM(run_id, run, **kwargs):
'''confirms a sample test case is failing, because of OOM '''
assert run.status == 'Failed'


run_pipeline_func([
TestCase(
pipeline_func=resource_request_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
),
# TODO: blocked by https://github.com/kubeflow/pipelines/issues/5835
# TestCase(
# pipeline_func=resource_request_pipeline,
# mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
# ),
TestCase(
pipeline_func=resource_request_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
arguments={'n': 21234567},
verify_func=EXPECTED_OOM,
),
# TODO: blocked by https://github.com/kubeflow/pipelines/issues/5835
# TestCase(
# pipeline_func=resource_request_pipeline,
# mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
# arguments={'n': 21234567},
# verify_func=EXPECTED_OOM,
# ),
])
2 changes: 2 additions & 0 deletions samples/test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
path: samples.core.loop_parallelism.loop_parallelism_test
- name: resource_spec
path: samples.core.resource_spec.resource_spec_test
- name: runtime_resource_spec
path: samples.core.resource_spec.runtime_resource_request_test
- name: xgboost_sample
path: samples.core.XGBoost.xgboost_sample_test
- name: use_run_id
Expand Down
18 changes: 18 additions & 0 deletions sdk/python/kfp/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,24 @@ def _op_to_template(op: BaseOp):
template['volumes'] = [convert_k8s_obj_to_json(volume) for volume in processed_op.volumes]
template['volumes'].sort(key=lambda x: x['name'])

# Runtime resource requests
if isinstance(op, dsl.ContainerOp) and ('resources' in op.container.keys()):
podSpecPatch = {}
for setting, val in op.container['resources'].items():
for resource, param in val.items():
if (resource in ['cpu', 'memory']) and re.match('^{{inputs.parameters.*}}$', param):
if not 'containers' in podSpecPatch:
podSpecPatch = {'containers':[{'name':'main', 'resources':{}}]}
if setting not in podSpecPatch['containers'][0]['resources']:
podSpecPatch['containers'][0]['resources'][setting] = {resource: param}
else:
podSpecPatch['containers'][0]['resources'][setting][resource] = param
del template['container']['resources'][setting][resource]
if not template['container']['resources'][setting]:
del template['container']['resources'][setting]
if podSpecPatch:
template['podSpecPatch'] = json.dumps(podSpecPatch)

if isinstance(op, dsl.ContainerOp) and op._metadata and not op.is_v2:
template.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/component_spec'] = json.dumps(op._metadata.to_dict(), sort_keys=True)

Expand Down
38 changes: 21 additions & 17 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,27 +289,29 @@ def add_resource_request(self, resource_name, value) -> 'Container':
self.resources.requests.update({resource_name: value})
return self

def set_memory_request(self, memory) -> 'Container':
def set_memory_request(self, memory: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
"""Set memory request (minimum) for this operator.
Args:
memory: a string which can be a number or a number followed by one of
memory(Union[str, PipelineParam]): a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""

self._validate_size_string(memory)
if not isinstance(memory,_pipeline_param.PipelineParam):
self._validate_size_string(memory)
return self.add_resource_request('memory', memory)

def set_memory_limit(self, memory) -> 'Container':
def set_memory_limit(self, memory: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
"""Set memory limit (maximum) for this operator.
Args:
memory: a string which can be a number or a number followed by one of
memory(Union[str, PipelineParam]): a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""
self._validate_size_string(memory)
if self._container_spec:
self._container_spec.resources.memory_limit = _get_resource_number(memory)
if not isinstance(memory,_pipeline_param.PipelineParam):
self._validate_size_string(memory)
if self._container_spec:
self._container_spec.resources.memory_limit = _get_resource_number(memory)
return self.add_resource_limit('memory', memory)

def set_ephemeral_storage_request(self, size) -> 'Container':
Expand All @@ -332,27 +334,29 @@ def set_ephemeral_storage_limit(self, size) -> 'Container':
self._validate_size_string(size)
return self.add_resource_limit('ephemeral-storage', size)

def set_cpu_request(self, cpu) -> 'Container':
def set_cpu_request(self, cpu: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
"""Set cpu request (minimum) for this operator.
Args:
cpu: A string which can be a number or a number followed by "m", which
cpu(Union[str, PipelineParam]): A string which can be a number or a number followed by "m", which
means 1/1000.
"""

self._validate_cpu_string(cpu)
if not isinstance(cpu,_pipeline_param.PipelineParam):
self._validate_cpu_string(cpu)
return self.add_resource_request('cpu', cpu)

def set_cpu_limit(self, cpu) -> 'Container':
def set_cpu_limit(self, cpu: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
"""Set cpu limit (maximum) for this operator.
Args:
cpu: A string which can be a number or a number followed by "m", which
cpu(Union[str, PipelineParam]): A string which can be a number or a number followed by "m", which
means 1/1000.
"""
self._validate_cpu_string(cpu)
if self._container_spec:
self._container_spec.resources.cpu_limit = _get_cpu_number(cpu)

if not isinstance(cpu,_pipeline_param.PipelineParam):
self._validate_cpu_string(cpu)
if self._container_spec:
self._container_spec.resources.cpu_limit = _get_cpu_number(cpu)
return self.add_resource_limit('cpu', cpu)

def set_gpu_limit(self, gpu, vendor='nvidia') -> 'Container':
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,20 @@ def my_pipeline():
self.assertEqual(template['retryStrategy']['backoff']['maxDuration'], backoff_max_duration)


def test_py_runtime_memory_request(self):
"""Test memory request."""

def my_pipeline(memory: str, cpu: str):
some_op().set_cpu_request(memory)

workflow = kfp.compiler.Compiler()._create_workflow(my_pipeline)
name_to_template = {template['name']: template for template in workflow['spec']['templates']}
main_dag_tasks = name_to_template[workflow['spec']['entrypoint']]['dag']['tasks']
template = name_to_template[main_dag_tasks[0]['template']]

self.assertEqual(template['podSpecPatch'], '{"containers": [{"name": "main", "resources": {"requests": {"cpu": "{{inputs.parameters.memory}}"}}}]}')


def test_py_retry_policy_invalid(self):
def my_pipeline():
some_op().set_retry(2, 'Invalid')
Expand Down

0 comments on commit 5db8431

Please sign in to comment.