Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(components/google-cloud): Expose all Custom Job parameters via Custom Job wrapper #6539

Merged
merged 4 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def run_as_vertex_ai_custom_job(
service_account: Optional[str] = None,
network: Optional[str] = None,
worker_pool_specs: Optional[List[Mapping[str, Any]]] = None,
encryption_spec_key_name: Optional[str] = None,
SinaChavoshi marked this conversation as resolved.
Show resolved Hide resolved
tensorboard: Optional[str] = None,
base_output_directory: Optional[str] = None,
) -> Callable:
"""Run a pipeline task using AI Platform (Unified) custom training job.

Expand Down Expand Up @@ -70,52 +73,57 @@ def run_as_vertex_ai_custom_job(
restart_job_on_worker_restart: Optional. Restarts the entire CustomJob if a
worker gets restarted. This feature can be used by distributed training
jobs that are not resilient to workers leaving and joining a job.
service_account: Optional. Specifies the service account for workload run-as
account.
service_account: Optional. Sets the default service account for workload
run-as account.
network: Optional. The full name of the Compute Engine network to which the
job should be peered. For example, projects/12345/global/networks/myVPC.
worker_pool_specs: Optional, worker_pool_specs for distributed training. this
will overwite all other cluster configurations. For details, please see:
https://cloud.google.com/ai-platform-unified/docs/training/distributed-training
encryption_spec_key_name: Optional, customer-managed encryption key options for
the CustomJob. If this is set, then all resources created by the CustomJob will
be encrypted with the provided encryption key.
tensorboard: The name of a Vertex AI Tensorboard resource to which this
CustomJob will upload Tensorboard logs.
base_output_directory: The Cloud Storage location to store the output of
this CustomJob or HyperparameterTuningJob. For HyperparameterTuningJob,
the baseOutputDirectory of each child CustomJob backing a Trial is set
to a subdirectory of name [id][Trial.id] under its parent
HyperparameterTuningJob's baseOutputDirectory.
Returns:
A Custom Job component OP correspoinding to the input component OP.
"""
job_spec = {}
input_specs = component_spec.component_spec.inputs

if worker_pool_specs is not None:
worker_pool_specs = copy.deepcopy(worker_pool_specs)

def _is_output_parameter(output_key: str) -> bool:
return output_key in (
component_spec.component_spec.output_definitions.parameters.
keys()
)
return output_key in (component_spec.component_spec
.output_definitions.parameters.keys())

for worker_pool_spec in worker_pool_specs:
if 'container_spec' in worker_pool_spec:
container_spec = worker_pool_spec['container_spec']
if 'command' in container_spec:
dsl_utils.resolve_cmd_lines(
container_spec['command'], _is_output_parameter
)
dsl_utils.resolve_cmd_lines(container_spec['command'],
_is_output_parameter)
if 'args' in container_spec:
dsl_utils.resolve_cmd_lines(
container_spec['args'], _is_output_parameter
)
dsl_utils.resolve_cmd_lines(container_spec['args'],
_is_output_parameter)

elif 'python_package_spec' in worker_pool_spec:
# For custom Python training, resolve placeholders in args only.
python_spec = worker_pool_spec['python_package_spec']
if 'args' in python_spec:
dsl_utils.resolve_cmd_lines(
python_spec['args'], _is_output_parameter
)
dsl_utils.resolve_cmd_lines(python_spec['args'],
_is_output_parameter)

else:
raise ValueError(
'Expect either "container_spec" or "python_package_spec" in each '
'workerPoolSpec. Got: {}'.format(worker_pool_spec)
)
'workerPoolSpec. Got: {}'.format(worker_pool_spec))

job_spec['worker_pool_specs'] = worker_pool_specs

Expand All @@ -134,48 +142,45 @@ def _is_output_parameter(output_key: str) -> bool:
'replica_count': 1,
'container_spec': {
'image_uri':
component_spec.component_spec.implementation.container.
image,
component_spec.component_spec.implementation.container
.image,
}
}
if component_spec.component_spec.implementation.container.command:
container_command_copy = component_spec.component_spec.implementation.container.command.copy(
)
dsl_utils.resolve_cmd_lines(
container_command_copy, _is_output_parameter
)
worker_pool_spec['container_spec']['command'
] = container_command_copy
dsl_utils.resolve_cmd_lines(container_command_copy,
_is_output_parameter)
worker_pool_spec['container_spec'][
'command'] = container_command_copy

if component_spec.component_spec.implementation.container.args:
container_args_copy = component_spec.component_spec.implementation.container.args.copy(
)
dsl_utils.resolve_cmd_lines(
container_args_copy, _is_output_parameter
)
dsl_utils.resolve_cmd_lines(container_args_copy,
_is_output_parameter)
worker_pool_spec['container_spec']['args'] = container_args_copy
if accelerator_type is not None:
worker_pool_spec['machine_spec']['accelerator_type'
] = accelerator_type
worker_pool_spec['machine_spec'][
'accelerator_type'] = accelerator_type
if accelerator_count is not None:
worker_pool_spec['machine_spec']['accelerator_count'
] = accelerator_count
worker_pool_spec['machine_spec'][
'accelerator_count'] = accelerator_count
if boot_disk_type is not None:
if 'disk_spec' not in worker_pool_spec:
worker_pool_spec['disk_spec'] = {}
worker_pool_spec['disk_spec']['boot_disk_type'] = boot_disk_type
if boot_disk_size_gb is not None:
if 'disk_spec' not in worker_pool_spec:
worker_pool_spec['disk_spec'] = {}
worker_pool_spec['disk_spec']['boot_disk_size_gb'
] = boot_disk_size_gb
worker_pool_spec['disk_spec'][
'boot_disk_size_gb'] = boot_disk_size_gb

job_spec['worker_pool_specs'] = [worker_pool_spec]
if replica_count is not None and replica_count > 1:
additional_worker_pool_spec = copy.deepcopy(worker_pool_spec)
additional_worker_pool_spec['replica_count'] = str(
replica_count - 1
)
additional_worker_pool_spec['replica_count'] = str(replica_count -
1)
job_spec['worker_pool_specs'].append(additional_worker_pool_spec)

if timeout is not None:
Expand All @@ -185,21 +190,53 @@ def _is_output_parameter(output_key: str) -> bool:
if restart_job_on_worker_restart is not None:
if 'scheduling' not in job_spec:
job_spec['scheduling'] = {}
job_spec['scheduling']['restart_job_on_worker_restart'
] = restart_job_on_worker_restart
if service_account is not None:
job_spec['service_account'] = service_account
if network is not None:
job_spec['network'] = network
job_spec['scheduling'][
'restart_job_on_worker_restart'] = restart_job_on_worker_restart

# Remove any existing service_account from component input list.
input_specs[:] = [
input_spec for input_spec in input_specs
if input_spec.name not in ('service_account', 'network',
'encryption_spec_key_name', 'tensorboard',
'base_output_directory')
]
job_spec['service_account'] = "{{$.inputs.parameters['service_account}']}}"
job_spec['network'] = "{{$.inputs.parameters['network}']}}"
job_spec[
'encryption_spec_key_name'] = "{{$.inputs.parameters['encryption_spec_key_name}']}}"
job_spec['tensorboard'] = "{{$.inputs.parameters['tensorboard}']}}"
job_spec[
'base_output_directory'] = "{{$.inputs.parameters['base_output_directory}']}}"
custom_job_payload = {
'display_name': display_name or component_spec.component_spec.name,
'job_spec': job_spec
}

custom_job_component_spec = structures.ComponentSpec(
name=component_spec.component_spec.name,
inputs=component_spec.component_spec.inputs + [
inputs=input_specs + [
structures.InputSpec(
name='base_output_directory',
type='String',
optional=True,
default=base_output_directory),
structures.InputSpec(
name='tensorboard',
type='String',
optional=True,
default=tensorboard),
structures.InputSpec(
name='encryption_spec_key_name',
type='String',
optional=True,
default=encryption_spec_key_name),
structures.InputSpec(
name='network', type='String', optional=True, default=network),
structures.InputSpec(
name='service_account',
type='String',
optional=True,
default=service_account),
structures.InputSpec(name='project', type='String'),
structures.InputSpec(name='location', type='String')
],
Expand All @@ -223,12 +260,9 @@ def _is_output_parameter(output_key: str) -> bool:
structures.InputValuePlaceholder(input_name='location'),
'--gcp_resources',
structures.OutputPathPlaceholder(
output_name='gcp_resources'
),
output_name='gcp_resources'),
],
)
)
)
)))
component_path = tempfile.mktemp()
custom_job_component_spec.save(component_path)

Expand Down
Loading