diff --git a/components/google-cloud/google_cloud_pipeline_components/experimental/custom_job/custom_job.py b/components/google-cloud/google_cloud_pipeline_components/experimental/custom_job/custom_job.py index 8a7f4d674ff..0c3dedd6792 100644 --- a/components/google-cloud/google_cloud_pipeline_components/experimental/custom_job/custom_job.py +++ b/components/google-cloud/google_cloud_pipeline_components/experimental/custom_job/custom_job.py @@ -42,6 +42,9 @@ def run_as_vertex_ai_custom_job( service_account: Optional[str] = None, network: Optional[str] = None, worker_pool_specs: Optional[List[Mapping[str, Any]]] = None, + encryption_spec_key_name: Optional[str] = None, + tensorboard: Optional[str] = None, + base_output_directory: Optional[str] = None, ) -> Callable: """Run a pipeline task using AI Platform (Unified) custom training job. @@ -70,52 +73,57 @@ def run_as_vertex_ai_custom_job( restart_job_on_worker_restart: Optional. Restarts the entire CustomJob if a worker gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job. - service_account: Optional. Specifies the service account for workload run-as - account. + service_account: Optional. Sets the default service account for workload + run-as account. network: Optional. The full name of the Compute Engine network to which the job should be peered. For example, projects/12345/global/networks/myVPC. worker_pool_specs: Optional, worker_pool_specs for distributed training. this will overwite all other cluster configurations. For details, please see: https://cloud.google.com/ai-platform-unified/docs/training/distributed-training + encryption_spec_key_name: Optional, customer-managed encryption key options for + the CustomJob. If this is set, then all resources created by the CustomJob will + be encrypted with the provided encryption key. + tensorboard: The name of a Vertex AI Tensorboard resource to which this + CustomJob will upload Tensorboard logs. + base_output_directory: The Cloud Storage location to store the output of + this CustomJob or HyperparameterTuningJob. For HyperparameterTuningJob, + the baseOutputDirectory of each child CustomJob backing a Trial is set + to a subdirectory of name [id][Trial.id] under its parent + HyperparameterTuningJob's baseOutputDirectory. Returns: A Custom Job component OP correspoinding to the input component OP. """ job_spec = {} + input_specs = component_spec.component_spec.inputs if worker_pool_specs is not None: worker_pool_specs = copy.deepcopy(worker_pool_specs) def _is_output_parameter(output_key: str) -> bool: - return output_key in ( - component_spec.component_spec.output_definitions.parameters. - keys() - ) + return output_key in (component_spec.component_spec + .output_definitions.parameters.keys()) for worker_pool_spec in worker_pool_specs: if 'container_spec' in worker_pool_spec: container_spec = worker_pool_spec['container_spec'] if 'command' in container_spec: - dsl_utils.resolve_cmd_lines( - container_spec['command'], _is_output_parameter - ) + dsl_utils.resolve_cmd_lines(container_spec['command'], + _is_output_parameter) if 'args' in container_spec: - dsl_utils.resolve_cmd_lines( - container_spec['args'], _is_output_parameter - ) + dsl_utils.resolve_cmd_lines(container_spec['args'], + _is_output_parameter) elif 'python_package_spec' in worker_pool_spec: # For custom Python training, resolve placeholders in args only. python_spec = worker_pool_spec['python_package_spec'] if 'args' in python_spec: - dsl_utils.resolve_cmd_lines( - python_spec['args'], _is_output_parameter - ) + dsl_utils.resolve_cmd_lines(python_spec['args'], + _is_output_parameter) else: raise ValueError( 'Expect either "container_spec" or "python_package_spec" in each ' - 'workerPoolSpec. Got: {}'.format(worker_pool_spec) - ) + 'workerPoolSpec. Got: {}'.format(worker_pool_spec)) job_spec['worker_pool_specs'] = worker_pool_specs @@ -134,32 +142,30 @@ def _is_output_parameter(output_key: str) -> bool: 'replica_count': 1, 'container_spec': { 'image_uri': - component_spec.component_spec.implementation.container. - image, + component_spec.component_spec.implementation.container + .image, } } if component_spec.component_spec.implementation.container.command: container_command_copy = component_spec.component_spec.implementation.container.command.copy( ) - dsl_utils.resolve_cmd_lines( - container_command_copy, _is_output_parameter - ) - worker_pool_spec['container_spec']['command' - ] = container_command_copy + dsl_utils.resolve_cmd_lines(container_command_copy, + _is_output_parameter) + worker_pool_spec['container_spec'][ + 'command'] = container_command_copy if component_spec.component_spec.implementation.container.args: container_args_copy = component_spec.component_spec.implementation.container.args.copy( ) - dsl_utils.resolve_cmd_lines( - container_args_copy, _is_output_parameter - ) + dsl_utils.resolve_cmd_lines(container_args_copy, + _is_output_parameter) worker_pool_spec['container_spec']['args'] = container_args_copy if accelerator_type is not None: - worker_pool_spec['machine_spec']['accelerator_type' - ] = accelerator_type + worker_pool_spec['machine_spec'][ + 'accelerator_type'] = accelerator_type if accelerator_count is not None: - worker_pool_spec['machine_spec']['accelerator_count' - ] = accelerator_count + worker_pool_spec['machine_spec'][ + 'accelerator_count'] = accelerator_count if boot_disk_type is not None: if 'disk_spec' not in worker_pool_spec: worker_pool_spec['disk_spec'] = {} @@ -167,15 +173,14 @@ def _is_output_parameter(output_key: str) -> bool: if boot_disk_size_gb is not None: if 'disk_spec' not in worker_pool_spec: worker_pool_spec['disk_spec'] = {} - worker_pool_spec['disk_spec']['boot_disk_size_gb' - ] = boot_disk_size_gb + worker_pool_spec['disk_spec'][ + 'boot_disk_size_gb'] = boot_disk_size_gb job_spec['worker_pool_specs'] = [worker_pool_spec] if replica_count is not None and replica_count > 1: additional_worker_pool_spec = copy.deepcopy(worker_pool_spec) - additional_worker_pool_spec['replica_count'] = str( - replica_count - 1 - ) + additional_worker_pool_spec['replica_count'] = str(replica_count - + 1) job_spec['worker_pool_specs'].append(additional_worker_pool_spec) if timeout is not None: @@ -185,13 +190,23 @@ def _is_output_parameter(output_key: str) -> bool: if restart_job_on_worker_restart is not None: if 'scheduling' not in job_spec: job_spec['scheduling'] = {} - job_spec['scheduling']['restart_job_on_worker_restart' - ] = restart_job_on_worker_restart - if service_account is not None: - job_spec['service_account'] = service_account - if network is not None: - job_spec['network'] = network + job_spec['scheduling'][ + 'restart_job_on_worker_restart'] = restart_job_on_worker_restart + # Remove any existing service_account from component input list. + input_specs[:] = [ + input_spec for input_spec in input_specs + if input_spec.name not in ('service_account', 'network', + 'encryption_spec_key_name', 'tensorboard', + 'base_output_directory') + ] + job_spec['service_account'] = "{{$.inputs.parameters['service_account}']}}" + job_spec['network'] = "{{$.inputs.parameters['network}']}}" + job_spec[ + 'encryption_spec_key_name'] = "{{$.inputs.parameters['encryption_spec_key_name}']}}" + job_spec['tensorboard'] = "{{$.inputs.parameters['tensorboard}']}}" + job_spec[ + 'base_output_directory'] = "{{$.inputs.parameters['base_output_directory}']}}" custom_job_payload = { 'display_name': display_name or component_spec.component_spec.name, 'job_spec': job_spec @@ -199,7 +214,29 @@ def _is_output_parameter(output_key: str) -> bool: custom_job_component_spec = structures.ComponentSpec( name=component_spec.component_spec.name, - inputs=component_spec.component_spec.inputs + [ + inputs=input_specs + [ + structures.InputSpec( + name='base_output_directory', + type='String', + optional=True, + default=base_output_directory), + structures.InputSpec( + name='tensorboard', + type='String', + optional=True, + default=tensorboard), + structures.InputSpec( + name='encryption_spec_key_name', + type='String', + optional=True, + default=encryption_spec_key_name), + structures.InputSpec( + name='network', type='String', optional=True, default=network), + structures.InputSpec( + name='service_account', + type='String', + optional=True, + default=service_account), structures.InputSpec(name='project', type='String'), structures.InputSpec(name='location', type='String') ], @@ -223,12 +260,9 @@ def _is_output_parameter(output_key: str) -> bool: structures.InputValuePlaceholder(input_name='location'), '--gcp_resources', structures.OutputPathPlaceholder( - output_name='gcp_resources' - ), + output_name='gcp_resources'), ], - ) - ) - ) + ))) component_path = tempfile.mktemp() custom_job_component_spec.save(component_path) diff --git a/components/google-cloud/tests/experimental/custom_job/test_custom_job.py b/components/google-cloud/tests/experimental/custom_job/test_custom_job.py index d3909d7b22e..884092ec71f 100644 --- a/components/google-cloud/tests/experimental/custom_job/test_custom_job.py +++ b/components/google-cloud/tests/experimental/custom_job/test_custom_job.py @@ -28,8 +28,7 @@ def setUp(self): def _create_a_container_based_component(self) -> callable: """Creates a test container based component factory.""" - return components.load_component_from_text( - """ + return components.load_component_from_text(""" name: ContainerComponent inputs: - {name: input_text, type: String, description: "Represents an input parameter."} @@ -46,8 +45,7 @@ def _create_a_container_based_component(self) -> callable: echo "$0, this is an output parameter" - {inputValue: input_text} - {outputPath: output_value} -""" - ) +""") def _create_a_pytnon_based_component(self) -> callable: """Creates a test python based component factory.""" @@ -59,14 +57,33 @@ def sum_numbers(a: int, b: int) -> int: return sum_numbers def test_run_as_vertex_ai_custom_job_on_container_spec_with_defualts_values_converts_correctly( - self - ): + self): expected_results = { 'name': 'ContainerComponent', 'inputs': [{ 'name': 'input_text', 'type': 'String', 'description': 'Represents an input parameter.' + }, { + 'name': 'base_output_directory', + 'type': 'String', + 'optional': True + }, { + 'name': 'tensorboard', + 'type': 'String', + 'optional': True + }, { + 'name': 'encryption_spec_key_name', + 'type': 'String', + 'optional': True + }, { + 'name': 'network', + 'type': 'String', + 'optional': True + }, { + 'name': 'service_account', + 'type': 'String', + 'optional': True }, { 'name': 'project', 'type': 'String' @@ -92,7 +109,7 @@ def test_run_as_vertex_ai_custom_job_on_container_spec_with_defualts_values_conv ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}]}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -106,16 +123,13 @@ def test_run_as_vertex_ai_custom_job_on_container_spec_with_defualts_values_conv } component_factory_function = self._create_a_container_based_component() custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function - ) - - self.assertDictEqual( - custom_job_spec.component_spec.to_dict(), expected_results - ) + component_factory_function) + print(custom_job_spec.component_spec.to_dict()) + self.assertDictEqual(custom_job_spec.component_spec.to_dict(), + expected_results) def test_run_as_vertex_ai_custom_job_on_python_spec_with_defualts_values_converts_correctly( - self - ): + self): # TODO enable after issue kfp release to support executor input. return expected_results = { @@ -162,16 +176,13 @@ def test_run_as_vertex_ai_custom_job_on_python_spec_with_defualts_values_convert } component_factory_function = self._create_a_pytnon_based_component() custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function - ) + component_factory_function) - self.assertDictContainsSubset( - custom_job_spec.component_spec.to_dict(), expected_results - ) + self.assertDictContainsSubset(custom_job_spec.component_spec.to_dict(), + expected_results) def test_run_as_vertex_ai_custom_with_worker_poolspec_container_spec_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() worker_pool_spec = [{ 'machine_spec': { @@ -196,7 +207,7 @@ def test_run_as_vertex_ai_custom_with_worker_poolspec_container_spec_converts_co ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "test_machine_type"}, "replica_count": 2, "container_spec": {"image_uri": "test_image_uri", "command": ["test_command"], "args": ["test_args"]}}]}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "test_machine_type"}, "replica_count": 2, "container_spec": {"image_uri": "test_image_uri", "command": ["test_command"], "args": ["test_args"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -209,17 +220,14 @@ def test_run_as_vertex_ai_custom_with_worker_poolspec_container_spec_converts_co } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, worker_pool_specs=worker_pool_spec - ) + component_factory_function, worker_pool_specs=worker_pool_spec) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_python_package_spec_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() python_package_spec = [{'python_package_spec': {'args': ['test_args']}}] @@ -234,7 +242,7 @@ def test_run_as_vertex_ai_custom_with_python_package_spec_converts_correctly( ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"python_package_spec": {"args": ["test_args"]}}]}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"python_package_spec": {"args": ["test_args"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -247,17 +255,14 @@ def test_run_as_vertex_ai_custom_with_python_package_spec_converts_correctly( } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, worker_pool_specs=python_package_spec - ) + component_factory_function, worker_pool_specs=python_package_spec) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_accelerator_type_and_count_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() expected_sub_results = { @@ -271,7 +276,7 @@ def test_run_as_vertex_ai_custom_with_accelerator_type_and_count_converts_correc ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4", "accelerator_type": "test_accelerator_type", "accelerator_count": 2}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}]}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4", "accelerator_type": "test_accelerator_type", "accelerator_count": 2}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -286,17 +291,14 @@ def test_run_as_vertex_ai_custom_with_accelerator_type_and_count_converts_correc custom_job_spec = custom_job.run_as_vertex_ai_custom_job( component_factory_function, accelerator_type="test_accelerator_type", - accelerator_count=2 - ) + accelerator_count=2) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_boot_disk_type_and_size_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() expected_sub_results = { @@ -310,7 +312,7 @@ def test_run_as_vertex_ai_custom_with_boot_disk_type_and_size_converts_correctly ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}, "disk_spec": {"boot_disk_type": "test_boot_disc_type", "boot_disk_size_gb": 2}}]}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}, {"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": "1", "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -323,19 +325,14 @@ def test_run_as_vertex_ai_custom_with_boot_disk_type_and_size_converts_correctly } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, - boot_disk_type='test_boot_disc_type', - boot_disk_size_gb=2 - ) + component_factory_function, replica_count=2) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_replica_count_greater_than_1_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() expected_sub_results = { @@ -349,7 +346,7 @@ def test_run_as_vertex_ai_custom_with_replica_count_greater_than_1_converts_corr ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}, {"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": "1", "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}]}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}, {"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": "1", "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -362,13 +359,11 @@ def test_run_as_vertex_ai_custom_with_replica_count_greater_than_1_converts_corr } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, replica_count=2 - ) + component_factory_function, replica_count=2) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_time_out_converts_correctly(self): component_factory_function = self._create_a_container_based_component() @@ -384,7 +379,7 @@ def test_run_as_vertex_ai_custom_with_time_out_converts_correctly(self): ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "scheduling": {"timeout": 2}}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "scheduling": {"timeout": 2}, "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -397,17 +392,14 @@ def test_run_as_vertex_ai_custom_with_time_out_converts_correctly(self): } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, timeout=2 - ) + component_factory_function, timeout=2) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_restart_job_on_worker_restart_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() expected_sub_results = { @@ -421,7 +413,7 @@ def test_run_as_vertex_ai_custom_with_restart_job_on_worker_restart_converts_cor ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "scheduling": {"restart_job_on_worker_restart": true}}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "scheduling": {"restart_job_on_worker_restart": true}, "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -434,17 +426,14 @@ def test_run_as_vertex_ai_custom_with_restart_job_on_worker_restart_converts_cor } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, restart_job_on_worker_restart=True - ) + component_factory_function, restart_job_on_worker_restart=True) self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_custom_service_account_converts_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() expected_sub_results = { @@ -458,7 +447,7 @@ def test_run_as_vertex_ai_custom_with_custom_service_account_converts_correctly( ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "test_service_account"}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -471,13 +460,11 @@ def test_run_as_vertex_ai_custom_with_custom_service_account_converts_correctly( } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, service_account='test_service_account' - ) + component_factory_function, service_account='test_service_account') self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_with_display_name_converts_correctly(self): component_factory_function = self._create_a_container_based_component() @@ -493,7 +480,7 @@ def test_run_as_vertex_ai_custom_with_display_name_converts_correctly(self): ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "test_display_name", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}]}}', + '{"display_name": "test_display_name", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -506,17 +493,14 @@ def test_run_as_vertex_ai_custom_with_display_name_converts_correctly(self): } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, display_name='test_display_name' - ) + component_factory_function, display_name='test_display_name') self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict()) def test_run_as_vertex_ai_custom_without_container_spec_or_python_package_spec_correctly( - self - ): + self): component_factory_function = self._create_a_container_based_component() worker_pool_spec = [{ @@ -527,8 +511,7 @@ def test_run_as_vertex_ai_custom_without_container_spec_or_python_package_spec_c }] with self.assertRaises(ValueError): custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, worker_pool_specs=worker_pool_spec - ) + component_factory_function, worker_pool_specs=worker_pool_spec) def test_run_as_vertex_ai_custom_with_network_converts_correctly(self): component_factory_function = self._create_a_container_based_component() @@ -544,7 +527,7 @@ def test_run_as_vertex_ai_custom_with_network_converts_correctly(self): ], 'args': [ '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "network": "test_network"}}', + '{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}', '--project', { 'inputValue': 'project' }, '--location', { @@ -557,10 +540,8 @@ def test_run_as_vertex_ai_custom_with_network_converts_correctly(self): } } custom_job_spec = custom_job.run_as_vertex_ai_custom_job( - component_factory_function, network='test_network' - ) + component_factory_function, network='test_network') self.assertDictContainsSubset( subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict() - ) + dictionary=custom_job_spec.component_spec.to_dict())