diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 59f539b2955..d3c04b94558 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -128,21 +128,16 @@ def _op_to_template(self, op): template['container']['command'] = op.command # Set resources. - if op.memory_limit or op.cpu_limit or op.memory_request or op.cpu_request: + if op.resource_limits or op.resource_requests: template['container']['resources'] = {} - if op.memory_limit or op.cpu_limit: - template['container']['resources']['limits'] = {} - if op.memory_limit: - template['container']['resources']['limits']['memory'] = op.memory_limit - if op.cpu_limit: - template['container']['resources']['limits']['cpu'] = op.cpu_limit - - if op.memory_request or op.cpu_request: - template['container']['resources']['requests'] = {} - if op.memory_request: - template['container']['resources']['requests']['memory'] = op.memory_request - if op.cpu_request: - template['container']['resources']['requests']['cpu'] = op.cpu_request + if op.resource_limits: + template['container']['resources']['limits'] = op.resource_limits + if op.resource_requests: + template['container']['resources']['requests'] = op.resource_requests + + # Set nodeSelector. + if op.node_selector: + template['nodeSelector'] = op.node_selector if op.env_variables: template['container']['env'] = list(map(self._convert_k8s_obj_to_dic, op.env_variables)) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 6cc99074a57..763693cbc73 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -53,10 +53,9 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None self.command = command self.arguments = arguments self.is_exit_handler = is_exit_handler - self.memory_limit = None - self.memory_request = None - self.cpu_limit = None - self.cpu_request = None + self.resource_limits = {} + self.resource_requests = {} + self.node_selector = {} self.volumes = [] self.volume_mounts = [] self.env_variables = [] @@ -112,6 +111,39 @@ def _validate_cpu_string(self, cpu_string): raise ValueError('Invalid cpu string. Should be float or integer, or integer followed ' 'by "m".') + def _validate_gpu_string(self, gpu_string): + "Validate a given string is valid for gpu limit." + + try: + gpu_value = int(gpu_string) + except ValueError: + raise ValueError('Invalid gpu string. Should be integer.') + + if gpu_value <= 0: + raise ValueError('gpu must be positive integer.') + + def add_resource_limit(self, resource_name, value): + """Add the resource limit of the container. + + Args: + resource_name: The name of the resource. It can be cpu, memory, etc. + value: The string value of the limit. + """ + + self.resource_limits[resource_name] = value + return self + + def add_resource_request(self, resource_name, value): + """Add the resource request of the container. + + Args: + resource_name: The name of the resource. It can be cpu, memory, etc. + value: The string value of the request. + """ + + self.resource_requests[resource_name] = value + return self + def set_memory_request(self, memory): """Set memory request (minimum) for this operator. @@ -121,8 +153,7 @@ def set_memory_request(self, memory): """ self._validate_memory_string(memory) - self.memory_request = memory - return self + return self.add_resource_request("memory", memory) def set_memory_limit(self, memory): """Set memory limit (maximum) for this operator. @@ -132,8 +163,7 @@ def set_memory_limit(self, memory): "E", "P", "T", "G", "M", "K". """ self._validate_memory_string(memory) - self.memory_limit = memory - return self + return self.add_resource_limit("memory", memory) def set_cpu_request(self, cpu): """Set cpu request (minimum) for this operator. @@ -143,8 +173,7 @@ def set_cpu_request(self, cpu): """ self._validate_cpu_string(cpu) - self.cpu_request = cpu - return self + return self.add_resource_request("cpu", cpu) def set_cpu_limit(self, cpu): """Set cpu limit (maximum) for this operator. @@ -154,8 +183,25 @@ def set_cpu_limit(self, cpu): """ self._validate_cpu_string(cpu) - self.cpu_limit = cpu - return self + return self.add_resource_limit("cpu", cpu) + + def set_gpu_limit(self, gpu, vendor = "nvidia"): + """Set gpu limit for the operator. This function add '.com/gpu' into resource limit. + Note that there is no need to add GPU request. GPUs are only supposed to be specified in + the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/. + + Args: + gpu: A string which must be a positive number. + vendor: Optional. A string which is the vendor of the requested gpu. The supported values + are: 'nvidia' (default), and 'amd'. + """ + + self._validate_gpu_string(gpu) + if vendor != 'nvidia' and vendor != 'amd': + raise ValueError('vendor can only be nvidia or amd.') + + return self.add_resource_limit("%s.com/gpu" % vendor, gpu) + def add_volume(self, volume): """Add K8s volume to the container @@ -193,5 +239,18 @@ def add_env_variable(self, env_variable): self.env_variables.append(env_variable) return self + def add_node_selector_constraint(self, label_name, value): + """Add a constraint for nodeSelector. Each constraint is a key-value pair label. For the + container to be eligible to run on a node, the node must have each of the constraints appeared + as labels. + + Args: + label_name: The name of the constraint label. + value: The value of the constraint label. + """ + + self.node_selector[label_name] = value + return self + def __repr__(self): return str({self.__class__.__name__: self.__dict__}) diff --git a/sdk/python/tests/compiler/testdata/basic.py b/sdk/python/tests/compiler/testdata/basic.py index b8cc74b05d5..3f078010000 100644 --- a/sdk/python/tests/compiler/testdata/basic.py +++ b/sdk/python/tests/compiler/testdata/basic.py @@ -85,3 +85,5 @@ def save_most_frequent_word(message: dsl.PipelineParam, outputpath: dsl.Pipeline message=counter.output, output_path=outputpath) saver.set_cpu_limit('0.5') + saver.set_gpu_limit('2') + saver.add_node_selector_constraint('cloud.google.com/gke-accelerator', 'nvidia-tesla-k80') diff --git a/sdk/python/tests/compiler/testdata/basic.yaml b/sdk/python/tests/compiler/testdata/basic.yaml index 2e0783fb41b..f9da8393bba 100644 --- a/sdk/python/tests/compiler/testdata/basic.yaml +++ b/sdk/python/tests/compiler/testdata/basic.yaml @@ -141,11 +141,14 @@ spec: resources: limits: cpu: "0.5" + nvidia.com/gpu: "2" inputs: parameters: - name: get-frequent-word - name: outputpath name: save + nodeSelector: + cloud.google.com/gke-accelerator: nvidia-tesla-k80 outputs: artifacts: - name: mlpipeline-ui-metadata