From ec2f72eca8f76102e977f65299e06493a21a81a9 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Thu, 28 Feb 2019 16:53:06 -0800 Subject: [PATCH 1/5] Add bigquery docstring and dump output path. --- .../kfp_component/google/bigquery/_query.py | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/component_sdk/python/kfp_component/google/bigquery/_query.py b/component_sdk/python/kfp_component/google/bigquery/_query.py index 536ca2d5be3..20bab11cdce 100644 --- a/component_sdk/python/kfp_component/google/bigquery/_query.py +++ b/component_sdk/python/kfp_component/google/bigquery/_query.py @@ -22,6 +22,20 @@ def query(query, project_id, dataset_id, table_id=None, output_gcs_path=None, job_config=None): + """Submit a query to Bigquery service and dump outputs to a GCS blob. + + Args: + query (str): The query used by Bigquery service to fetch the results. + project_id (str): The project to execute the query job. + dataset_id (str): The ID of the persistent dataset to keep the results + of the query. + table_id (str): The ID of the table to keep the results of the query. If + absent, the operation will generate a random id for the table. + output_gcs_path (str): The GCS blob path to dump the query results to. + job_config (dict): The full config spec for the query job. + Returns: + The API representation of the completed query job. + """ client = bigquery.Client(project=project_id) if not job_config: job_config = bigquery.QueryJobConfig() @@ -48,6 +62,7 @@ def cancel(): if not extract_job: extract_job = client.extract_table(table_ref, output_gcs_path) extract_job.result() # Wait for export to finish + _dump_outputs(query_job, output_gcs_path) return query_job.to_api_repr() def _get_job(client, job_id): @@ -63,6 +78,10 @@ def _display_job_link(project_id, job_id): text='Query Details' )) -def _dump_outputs(job): - gcp_common.dump_file('/tmp/outputs/bigquery-job.json', - json.dumps(job.to_api_repr())) \ No newline at end of file +def _dump_outputs(job, output_path): + gcp_common.dump_file('/tmp/kfp/output/biquery/query-job.json', + json.dumps(job.to_api_repr())) + if not output_path: + output_path = '' + gcp_common.dump_file('/tmp/kfp/output/biquery/query-output-path.txt', + output_path) \ No newline at end of file From 37300ec4a0acb5764190b601a84b6ed0740dfd6e Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Fri, 1 Mar 2019 11:30:02 -0800 Subject: [PATCH 2/5] Auto create dataset if it's not exist and dump results in local files --- .../kfp_component/google/bigquery/_query.py | 53 +++++++++++++++---- .../tests/google/bigquery/test__query.py | 34 ++++++++++-- 2 files changed, 72 insertions(+), 15 deletions(-) diff --git a/component_sdk/python/kfp_component/google/bigquery/_query.py b/component_sdk/python/kfp_component/google/bigquery/_query.py index 20bab11cdce..a3c3d8f6c44 100644 --- a/component_sdk/python/kfp_component/google/bigquery/_query.py +++ b/component_sdk/python/kfp_component/google/bigquery/_query.py @@ -13,6 +13,7 @@ # limitations under the License. import json +import logging from google.cloud import bigquery from google.api_core import exceptions @@ -20,15 +21,16 @@ from kfp_component.core import KfpExecutionContext, display from .. import common as gcp_common -def query(query, project_id, dataset_id, table_id=None, +def query(query, project_id, dataset_id=None, table_id=None, output_gcs_path=None, job_config=None): """Submit a query to Bigquery service and dump outputs to a GCS blob. - + Args: query (str): The query used by Bigquery service to fetch the results. project_id (str): The project to execute the query job. dataset_id (str): The ID of the persistent dataset to keep the results - of the query. + of the query. If the dataset does not exist, the operation will + create a new one. table_id (str): The ID of the table to keep the results of the query. If absent, the operation will generate a random id for the table. output_gcs_path (str): The GCS blob path to dump the query results to. @@ -47,21 +49,28 @@ def cancel(): client.cancel_job(job_id) with KfpExecutionContext(on_cancel=cancel) as ctx: job_id = 'query_' + ctx.context_id() - if not table_id: - table_id = 'table_' + ctx.context_id() - table_ref = client.dataset(dataset_id).table(table_id) query_job = _get_job(client, job_id) + table_ref = None if not query_job: - job_config.destination = table_ref + dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path) + if dataset_ref: + if not table_id: + table_id = job_id + table_ref = dataset_ref.table(table_id) + job_config.destination = table_ref query_job = client.query(query, job_config, job_id=job_id) _display_job_link(project_id, job_id) - query_job.result() + query_result = query_job.result() if output_gcs_path: job_id = 'extract_' + ctx.context_id() extract_job = _get_job(client, job_id) if not extract_job: extract_job = client.extract_table(table_ref, output_gcs_path) - extract_job.result() # Wait for export to finish + extract_job.result() # Wait for export to + else: + # Download results to local disk if no gcs output path. + gcp_common.dump_file('/tmp/kfp/output/bigquery/query_output.csv', + query_result.to_dataframe().to_csv()) _dump_outputs(query_job, output_gcs_path) return query_job.to_api_repr() @@ -71,6 +80,30 @@ def _get_job(client, job_id): except exceptions.NotFound: return None +def _prepare_dataset_ref(client, dataset_id, output_gcs_path): + if not output_gcs_path and not dataset_id: + return None + + if not dataset_id: + dataset_id = 'kfp_tmp_dataset' + dataset_ref = client.dataset(dataset_id) + dataset = _get_dataset(client, dataset_ref) + if not dataset: + logging.info('Creating dataset {}'.format(dataset_id)) + dataset = _create_dataset(client, dataset_ref) + return dataset_ref + +def _get_dataset(client, dataset_ref): + try: + return client.get_dataset(dataset_ref) + except exceptions.NotFound: + return None + +def _create_dataset(client, dataset_ref): + dataset = bigquery.Dataset(dataset_ref) + dataset.location = "US" + return client.create_dataset(dataset) + def _display_job_link(project_id, job_id): display.display(display.Link( href= 'https://console.cloud.google.com/bigquery?project={}' @@ -84,4 +117,4 @@ def _dump_outputs(job, output_path): if not output_path: output_path = '' gcp_common.dump_file('/tmp/kfp/output/biquery/query-output-path.txt', - output_path) \ No newline at end of file + output_path) diff --git a/component_sdk/python/tests/google/bigquery/test__query.py b/component_sdk/python/tests/google/bigquery/test__query.py index 06d91a42747..f663ae84861 100644 --- a/component_sdk/python/tests/google/bigquery/test__query.py +++ b/component_sdk/python/tests/google/bigquery/test__query.py @@ -25,10 +25,13 @@ @mock.patch(CREATE_JOB_MODULE + '.bigquery.Client') class TestQuery(unittest.TestCase): - def test_create_job_succeed(self, mock_client, + def test_query_succeed(self, mock_client, mock_kfp_context, mock_dump_json, mock_display): mock_kfp_context().__enter__().context_id.return_value = 'ctx1' mock_client().get_job.side_effect = exceptions.NotFound('not found') + mock_dataset = bigquery.DatasetReference('project-1', 'dataset-1') + mock_client().dataset.return_value = mock_dataset + mock_client().get_dataset.side_effect = exceptions.NotFound('not found') mock_response = { 'configuration': { 'query': { @@ -37,17 +40,16 @@ def test_create_job_succeed(self, mock_client, } } mock_client().query.return_value.to_api_repr.return_value = mock_response - mock_dataset = bigquery.DatasetReference('project-1', 'dataset-1') - mock_client().dataset.return_value = mock_dataset result = query('SELECT * FROM table_1', 'project-1', 'dataset-1', output_gcs_path='gs://output/path') self.assertEqual(mock_response, result) + mock_client().create_dataset.assert_called() expected_job_config = bigquery.QueryJobConfig() expected_job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED expected_job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE - expected_job_config.destination = mock_dataset.table('table_ctx1') + expected_job_config.destination = mock_dataset.table('query_ctx1') mock_client().query.assert_called_with('SELECT * FROM table_1',mock.ANY, job_id = 'query_ctx1') actual_job_config = mock_client().query.call_args_list[0][0][1] @@ -56,6 +58,28 @@ def test_create_job_succeed(self, mock_client, actual_job_config.to_api_repr() ) mock_client().extract_table.assert_called_with( - mock_dataset.table('table_ctx1'), + mock_dataset.table('query_ctx1'), 'gs://output/path') + self.assertEqual(2, mock_dump_json.call_count) + + def test_query_dump_locally(self, mock_client, + mock_kfp_context, mock_dump_json, mock_display): + mock_kfp_context().__enter__().context_id.return_value = 'ctx1' + mock_client().get_job.side_effect = exceptions.NotFound('not found') + mock_response = { + 'configuration': { + 'query': { + 'query': 'SELECT * FROM table_1' + } + } + } + mock_client().query.return_value.to_api_repr.return_value = mock_response + + result = query('SELECT * FROM table_1', 'project-1') + + self.assertEqual(mock_response, result) + mock_client().create_dataset.assert_not_called() + mock_client().query.assert_called() + mock_client().extract_table.assert_not_called() + self.assertEqual(3, mock_dump_json.call_count) From 5b513ee4fd1fe734b08b44291194d317bac101cb Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Sat, 2 Mar 2019 16:32:27 -0800 Subject: [PATCH 3/5] make dataset location configurable --- .../python/kfp_component/google/bigquery/_query.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/component_sdk/python/kfp_component/google/bigquery/_query.py b/component_sdk/python/kfp_component/google/bigquery/_query.py index a3c3d8f6c44..ab99b0f1c7d 100644 --- a/component_sdk/python/kfp_component/google/bigquery/_query.py +++ b/component_sdk/python/kfp_component/google/bigquery/_query.py @@ -22,7 +22,7 @@ from .. import common as gcp_common def query(query, project_id, dataset_id=None, table_id=None, - output_gcs_path=None, job_config=None): + output_gcs_path=None, dataset_location='US', job_config=None): """Submit a query to Bigquery service and dump outputs to a GCS blob. Args: @@ -34,6 +34,7 @@ def query(query, project_id, dataset_id=None, table_id=None, table_id (str): The ID of the table to keep the results of the query. If absent, the operation will generate a random id for the table. output_gcs_path (str): The GCS blob path to dump the query results to. + dataset_location (str): The location to create the dataset. Defaults to `US`. job_config (dict): The full config spec for the query job. Returns: The API representation of the completed query job. @@ -52,7 +53,8 @@ def cancel(): query_job = _get_job(client, job_id) table_ref = None if not query_job: - dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path) + dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path, + dataset_location) if dataset_ref: if not table_id: table_id = job_id @@ -80,7 +82,7 @@ def _get_job(client, job_id): except exceptions.NotFound: return None -def _prepare_dataset_ref(client, dataset_id, output_gcs_path): +def _prepare_dataset_ref(client, dataset_id, output_gcs_path, dataset_location): if not output_gcs_path and not dataset_id: return None @@ -90,7 +92,7 @@ def _prepare_dataset_ref(client, dataset_id, output_gcs_path): dataset = _get_dataset(client, dataset_ref) if not dataset: logging.info('Creating dataset {}'.format(dataset_id)) - dataset = _create_dataset(client, dataset_ref) + dataset = _create_dataset(client, dataset_ref, dataset_location) return dataset_ref def _get_dataset(client, dataset_ref): @@ -99,9 +101,9 @@ def _get_dataset(client, dataset_ref): except exceptions.NotFound: return None -def _create_dataset(client, dataset_ref): +def _create_dataset(client, dataset_ref, location): dataset = bigquery.Dataset(dataset_ref) - dataset.location = "US" + dataset.location = location return client.create_dataset(dataset) def _display_job_link(project_id, job_id): From 594f6e9992773cf4fd278cbea105208f930e3161 Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Sat, 2 Mar 2019 16:36:17 -0800 Subject: [PATCH 4/5] Add todo to make kfp output path configurable. --- .../python/kfp_component/google/bigquery/_query.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/component_sdk/python/kfp_component/google/bigquery/_query.py b/component_sdk/python/kfp_component/google/bigquery/_query.py index ab99b0f1c7d..29138a79a87 100644 --- a/component_sdk/python/kfp_component/google/bigquery/_query.py +++ b/component_sdk/python/kfp_component/google/bigquery/_query.py @@ -21,6 +21,9 @@ from kfp_component.core import KfpExecutionContext, display from .. import common as gcp_common +# TODO(hongyes): make this path configurable as a environment variable +KFP_OUTPUT_PATH = '/tmp/kfp/output/' + def query(query, project_id, dataset_id=None, table_id=None, output_gcs_path=None, dataset_location='US', job_config=None): """Submit a query to Bigquery service and dump outputs to a GCS blob. @@ -71,7 +74,7 @@ def cancel(): extract_job.result() # Wait for export to else: # Download results to local disk if no gcs output path. - gcp_common.dump_file('/tmp/kfp/output/bigquery/query_output.csv', + gcp_common.dump_file(KFP_OUTPUT_PATH + 'bigquery/query_output.csv', query_result.to_dataframe().to_csv()) _dump_outputs(query_job, output_gcs_path) return query_job.to_api_repr() @@ -114,9 +117,9 @@ def _display_job_link(project_id, job_id): )) def _dump_outputs(job, output_path): - gcp_common.dump_file('/tmp/kfp/output/biquery/query-job.json', + gcp_common.dump_file(KFP_OUTPUT_PATH + 'biquery/query-job.json', json.dumps(job.to_api_repr())) if not output_path: output_path = '' - gcp_common.dump_file('/tmp/kfp/output/biquery/query-output-path.txt', + gcp_common.dump_file(KFP_OUTPUT_PATH + 'biquery/query-output-path.txt', output_path) From cdfe024d58ac461bf4a72aaaea147d3f963c60fb Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Tue, 5 Mar 2019 15:08:12 -0800 Subject: [PATCH 5/5] Fix comment --- component_sdk/python/kfp_component/google/bigquery/_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component_sdk/python/kfp_component/google/bigquery/_query.py b/component_sdk/python/kfp_component/google/bigquery/_query.py index 29138a79a87..398c6c15bd3 100644 --- a/component_sdk/python/kfp_component/google/bigquery/_query.py +++ b/component_sdk/python/kfp_component/google/bigquery/_query.py @@ -71,7 +71,7 @@ def cancel(): extract_job = _get_job(client, job_id) if not extract_job: extract_job = client.extract_table(table_ref, output_gcs_path) - extract_job.result() # Wait for export to + extract_job.result() # Wait for export to finish else: # Download results to local disk if no gcs output path. gcp_common.dump_file(KFP_OUTPUT_PATH + 'bigquery/query_output.csv',