From 4ce4ac0043d47a608872ca79d928f1c030dd57be Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Wed, 27 Feb 2019 22:19:20 -0800 Subject: [PATCH 1/3] Dump job id and change output to /tmp/kfp/output --- .../python/kfp_component/google/dataflow/_common_ops.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/component_sdk/python/kfp_component/google/dataflow/_common_ops.py b/component_sdk/python/kfp_component/google/dataflow/_common_ops.py index d5a119b834c..9fbae8974bb 100644 --- a/component_sdk/python/kfp_component/google/dataflow/_common_ops.py +++ b/component_sdk/python/kfp_component/google/dataflow/_common_ops.py @@ -108,7 +108,8 @@ def display_job_link(project_id, job): )) def dump_job(job): - gcp_common.dump_file('/tmp/output/job.json', json.dumps(job)) + gcp_common.dump_file('/tmp/kfp/output/dataflow/job.json', json.dumps(job)) + gcp_common.dump_file('/tmp/kfp/output/dataflow/job_id.txt', job.get('id')) def stage_file(local_or_gcs_path): if not is_gcs_path(local_or_gcs_path): From f870afef49a6788ad4049b2cde279371dd772192 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Thu, 28 Feb 2019 15:57:43 -0800 Subject: [PATCH 2/3] fix test error --- .../python/tests/google/dataflow/test__launch_python.py | 2 ++ .../python/tests/google/dataflow/test__launch_template.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/component_sdk/python/tests/google/dataflow/test__launch_python.py b/component_sdk/python/tests/google/dataflow/test__launch_python.py index 246816dd106..e8f653862f6 100644 --- a/component_sdk/python/tests/google/dataflow/test__launch_python.py +++ b/component_sdk/python/tests/google/dataflow/test__launch_python.py @@ -38,6 +38,7 @@ def test_launch_python_succeed(self, mock_subprocess, mock_process, b'https://console.cloud.google.com/dataflow/locations/us-central1/jobs/job-1?project=project-1' ] expected_job = { + 'id': 'job-1', 'currentState': 'JOB_STATE_DONE' } mock_client().get_job.return_value = expected_job @@ -56,6 +57,7 @@ def test_launch_python_retry_succeed(self, mock_subprocess, mock_process, }] } expected_job = { + 'id': 'job-1', 'currentState': 'JOB_STATE_DONE' } mock_client().get_job.return_value = expected_job diff --git a/component_sdk/python/tests/google/dataflow/test__launch_template.py b/component_sdk/python/tests/google/dataflow/test__launch_template.py index d5ceff3bf32..138238ab3f9 100644 --- a/component_sdk/python/tests/google/dataflow/test__launch_template.py +++ b/component_sdk/python/tests/google/dataflow/test__launch_template.py @@ -34,6 +34,7 @@ def test_launch_template_succeed(self, mock_client, mock_context, mock_display): 'job': { 'id': 'job-1' } } expected_job = { + 'id': 'job-1', 'currentState': 'JOB_STATE_DONE' } mock_client().get_job.return_value = expected_job @@ -64,6 +65,7 @@ def test_launch_template_retry_succeed(self, 'currentState': 'JOB_STATE_PENDING' } expected_job = { + 'id': 'job-1', 'currentState': 'JOB_STATE_DONE' } mock_client().get_job.side_effect = [pending_job, expected_job] @@ -89,6 +91,7 @@ def test_launch_template_fail(self, mock_client, mock_context, mock_display): 'job': { 'id': 'job-1' } } failed_job = { + 'id': 'job-1', 'currentState': 'JOB_STATE_FAILED' } mock_client().get_job.return_value = failed_job From 0eedcec1bddbf987bc754f21a7dbea87ab19c620 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Fri, 1 Mar 2019 11:25:36 -0800 Subject: [PATCH 3/3] FIx bug in validate_only mode --- .../python/kfp_component/google/dataflow/_launch_template.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/component_sdk/python/kfp_component/google/dataflow/_launch_template.py b/component_sdk/python/kfp_component/google/dataflow/_launch_template.py index 6d87dd730a1..4e950c355ee 100644 --- a/component_sdk/python/kfp_component/google/dataflow/_launch_template.py +++ b/component_sdk/python/kfp_component/google/dataflow/_launch_template.py @@ -69,6 +69,9 @@ def cancel(): launch_parameters['jobName'] = job_name response = df_client.launch_template(project_id, gcs_path, location, validate_only, launch_parameters) - job = response.get('job') + job = response.get('job', None) + if not job: + # Validate only mode + return job return wait_and_dump_job(df_client, project_id, location, job, wait_interval) \ No newline at end of file