Skip to content

Commit

Permalink
Allow users to specify resource constraints in platform_config for V2
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 455506159
  • Loading branch information
tfx-copybara committed Jun 17, 2022
1 parent 8d9226f commit b15d592
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 27 deletions.
6 changes: 2 additions & 4 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

* Added experimental exit_handler support for KubeflowDagRunner.
* Enabled custom labels to be submitted to CAIP training jobs.
* Enabled custom Python function-based components to share pipeline Beam
configuration by [inheriting from BaseBeamComponent]
(https://www.tensorflow.org/tfx/guide/custom_function_component)
* Support dynamic exec properties in TFX pipeline. Downstream component parameter could take the upstream output using Placeholder.
* Enabled custom resource-setting (vCPU and RAM) for containers orchestrating
on Vertex AI.

## Breaking Changes

Expand Down
14 changes: 6 additions & 8 deletions tfx/orchestration/kubeflow/v2/kubeflow_v2_dag_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
# Current schema version for the API proto.
_SCHEMA_VERSION = '2.0.0'


# Default TFX container image/commands to use in KubeflowV2DagRunner.
_KUBEFLOW_TFX_IMAGE = 'gcr.io/tfx-oss-public/tfx:{}'.format(
version_utils.get_image_version())
Expand Down Expand Up @@ -69,11 +68,11 @@ def __init__(self,
default_commands: Optionally specifies the commands of the provided
container image. When not provided, the default `ENTRYPOINT` specified
in the docker image is used. Note: the commands here refers to the K8S
container command, which maps to Docker entrypoint field. If one
supplies command but no args are provided for the container, the
container will be invoked with the provided command, ignoring the
`ENTRYPOINT` and `CMD` defined in the Dockerfile. One can find more
details regarding the difference between K8S and Docker conventions at
container command, which maps to Docker entrypoint field. If one
supplies command but no args are provided for the container, the
container will be invoked with the provided command, ignoring the
`ENTRYPOINT` and `CMD` defined in the Dockerfile. One can find more
details regarding the difference between K8S and Docker conventions at
https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes
**kwargs: Additional args passed to base PipelineConfig.
"""
Expand Down Expand Up @@ -115,8 +114,7 @@ def __init__(self,
self._output_filename = output_filename or 'pipeline.json'
self._exit_handler = None

def set_exit_handler(self,
exit_handler: base_node.BaseNode):
def set_exit_handler(self, exit_handler: base_node.BaseNode):
"""Set exit handler components for the Kubeflow V2(Vertex AI) dag runner.
This feature is currently experimental without backward compatibility
Expand Down
52 changes: 38 additions & 14 deletions tfx/orchestration/kubeflow/v2/step_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,22 @@ def __init__(self,
Args:
node: A TFX node. The logical unit of a step. Note, currently for resolver
node we only support two types of resolver
policies, including: 1) latest blessed model, and 2) latest model
artifact.
node we only support two types of resolver policies, including: 1)
latest blessed model, and 2) latest model artifact.
deployment_config: The deployment config in Kubeflow IR to be populated.
component_defs: Dict mapping from node id to compiled ComponetSpec proto.
Items in the dict will get updated as the pipeline is built.
dsl_context_reg: A DslContextRegistry instance from
Pipeline.dsl_context_registry.
image: TFX image used in the underlying container spec. Required if node
is a TFX component.
image_cmds: Optional. If not specified the default `ENTRYPOINT` defined
in the docker image will be used. Note: the commands here refers to the
K8S container command, which maps to Docker entrypoint field. If one
supplies command but no args are provided for the container, the
container will be invoked with the provided command, ignoring the
`ENTRYPOINT` and `CMD` defined in the Dockerfile. One can find more
details regarding the difference between K8S and Docker conventions at
image_cmds: Optional. If not specified the default `ENTRYPOINT` defined in
the docker image will be used. Note: the commands here refers to the K8S
container command, which maps to Docker entrypoint field. If one
supplies command but no args are provided for the container, the
container will be invoked with the provided command, ignoring the
`ENTRYPOINT` and `CMD` defined in the Dockerfile. One can find more
details regarding the difference between K8S and Docker conventions at
https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes
beam_pipeline_args: Pipeline arguments for Beam powered Components.
enable_cache: If true, enables cache lookup for this pipeline step.
Expand Down Expand Up @@ -261,8 +260,8 @@ def build(self) -> Dict[str, pipeline_pb2.PipelineTaskSpec]:
# Conditionals
implicit_input_channels = {}
implicit_upstream_node_ids = set()
predicates = conditional.get_predicates(
self._node, self._dsl_context_registry)
predicates = conditional.get_predicates(self._node,
self._dsl_context_registry)
if predicates:
implicit_keys_map = {
tfx_compiler_utils.implicit_channel_key(channel): key
Expand Down Expand Up @@ -404,7 +403,29 @@ def _build_container_spec(self) -> ContainerSpec:
NotImplementedError: When the executor class is neither ExecutorClassSpec
nor TemplatedExecutorContainerSpec.
"""

assert isinstance(self._node, base_component.BaseComponent)

if self._node.platform_config:
logging.info(
'ResourceSpec with container execution parameters has been passed via platform_config'
)
assert isinstance(
self._node.platform_config, pipeline_pb2.PipelineDeploymentConfig
.PipelineContainerSpec.ResourceSpec
), ('platform_config, if set by the user, must be a ResourceSpec proto '
'specifying vCPU and vRAM requirements')
cpu_limit = self._node.platform_config.cpu_limit
memory_limit = self._node.platform_config.memory_limit
if cpu_limit:
assert (cpu_limit >= 0), ('vCPU must be non-negative')
if memory_limit:
assert (memory_limit >= 0), ('vRAM must be non-negative')

if self._node.platform_config.accelerator.type:
assert (self._node.platform_config.accelerator.count >=
0), ('GPU type and count must be set')

if isinstance(self._node.executor_spec,
executor_specs.TemplatedExecutorContainerSpec):
container_spec = self._node.executor_spec
Expand All @@ -413,8 +434,9 @@ def _build_container_spec(self) -> ContainerSpec:
command=_resolve_command_line(
container_spec=container_spec,
exec_properties=self._node.exec_properties,
),
)
))
if self._node.platform_config:
result.resources.CopyFrom(self._node.platform_config)
return result

# The container entrypoint format below assumes ExecutorClassSpec.
Expand All @@ -438,6 +460,8 @@ def _build_container_spec(self) -> ContainerSpec:
result.args.append('{{$}}')
result.args.extend(self._beam_pipeline_args)

if self._node.platform_config:
result.resources.CopyFrom(self._node.platform_config)
return result

def _build_file_based_example_gen_spec(self) -> ContainerSpec:
Expand Down
4 changes: 3 additions & 1 deletion tfx/orchestration/kubeflow/v2/step_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def _sole(self, d: Dict[Any, Any]) -> Any:
def testBuildTask(self):
query = 'SELECT * FROM TABLE'
bq_example_gen = big_query_example_gen_component.BigQueryExampleGen(
query=query)
query=query).with_platform_config(
pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec
.ResourceSpec(cpu_limit=5.0, memory_limit=10.0))
deployment_config = pipeline_pb2.PipelineDeploymentConfig()
component_defs = {}
my_builder = step_builder.StepBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ executors {
args: "tfx.extensions.google_cloud_big_query.example_gen.executor.Executor"
args: "--json_serialized_invocation_args"
args: "{{$}}"
resources {
cpu_limit: 5.0
memory_limit: 10.0
}
}
}
}

0 comments on commit b15d592

Please sign in to comment.