From 931285ff85842ab07a0ef2ff9db808181ea3c5e4 Mon Sep 17 00:00:00 2001 From: aribray <45905583+aribray@users.noreply.github.com> Date: Mon, 14 Nov 2022 16:26:37 -0600 Subject: [PATCH] feat: add `reference_file_schema_uri` to LoadJobConfig, ExternalConfig (#1399) * feat: add 'reference_file_schema_uri' to LoadJobConfig and ExternalConfig --- google/cloud/bigquery/external_config.py | 14 ++ google/cloud/bigquery/job/load.py | 21 +++ testing/constraints-3.7.txt | 2 +- tests/system/test_client.py | 203 +++++++++++++++++++++++ tests/unit/job/test_base.py | 5 +- tests/unit/job/test_load.py | 12 ++ tests/unit/test_external_config.py | 6 + 7 files changed, 258 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigquery/external_config.py b/google/cloud/bigquery/external_config.py index 640b2d16b..bd60e4ef1 100644 --- a/google/cloud/bigquery/external_config.py +++ b/google/cloud/bigquery/external_config.py @@ -756,6 +756,20 @@ def hive_partitioning(self, value): prop = value.to_api_repr() if value is not None else None self._properties["hivePartitioningOptions"] = prop + @property + def reference_file_schema_uri(self): + """Optional[str]: + When creating an external table, the user can provide a reference file with the + table schema. This is enabled for the following formats: + + AVRO, PARQUET, ORC + """ + return self._properties.get("referenceFileSchemaUri") + + @reference_file_schema_uri.setter + def reference_file_schema_uri(self, value): + self._properties["referenceFileSchemaUri"] = value + @property def ignore_unknown_values(self): """bool: If :data:`True`, extra values that are not represented in the diff --git a/google/cloud/bigquery/job/load.py b/google/cloud/bigquery/job/load.py index e4b44395e..5c7f26841 100644 --- a/google/cloud/bigquery/job/load.py +++ b/google/cloud/bigquery/job/load.py @@ -379,6 +379,20 @@ def range_partitioning(self, value): ) self._set_sub_prop("rangePartitioning", resource) + @property + def reference_file_schema_uri(self): + """Optional[str]: + When creating an external table, the user can provide a reference file with the + table schema. This is enabled for the following formats: + + AVRO, PARQUET, ORC + """ + return self._get_sub_prop("referenceFileSchemaUri") + + @reference_file_schema_uri.setter + def reference_file_schema_uri(self, value): + return self._set_sub_prop("referenceFileSchemaUri", value) + @property def schema(self): """Optional[Sequence[Union[ \ @@ -651,6 +665,13 @@ def quote_character(self): """ return self._configuration.quote_character + @property + def reference_file_schema_uri(self): + """See: + attr:`google.cloud.bigquery.job.LoadJobConfig.reference_file_schema_uri`. + """ + return self._configuration.reference_file_schema_uri + @property def skip_leading_rows(self): """See diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 57928714f..2c5b169db 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -25,4 +25,4 @@ python-dateutil==2.7.3 requests==2.21.0 Shapely==1.6.4.post2 six==1.13.0 -tqdm==4.7.4 +tqdm==4.7.4 \ No newline at end of file diff --git a/tests/system/test_client.py b/tests/system/test_client.py index c99ee1c72..152bb8144 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -97,6 +97,20 @@ ), ] +SOURCE_URIS_AVRO = [ + "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro", + "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.avro", + "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.avro", +] +SOURCE_URIS_PARQUET = [ + "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet", + "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.parquet", + "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.parquet", +] +REFERENCE_FILE_SCHEMA_URI_AVRO = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro" +REFERENCE_FILE_SCHEMA_URI_PARQUET = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet" + + # The VPC-SC team maintains a mirror of the GCS bucket used for code # samples. The public bucket crosses the configured security boundary. # See: https://github.com/googleapis/google-cloud-python/issues/8550 @@ -1052,6 +1066,195 @@ def test_load_table_from_file_w_explicit_location(self): table_ref, "gs://{}/letters-us.csv".format(bucket_name), location="US" ).result() + def test_create_external_table_with_reference_file_schema_uri_avro(self): + client = Config.CLIENT + dataset_id = _make_dataset_id("external_reference_file_avro") + self.temp_dataset(dataset_id) + dataset_ref = bigquery.DatasetReference(client.project, dataset_id) + table_id = "test_ref_file_avro" + table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id) + + expected_schema = [ + bigquery.SchemaField("username", "STRING", mode="NULLABLE"), + bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"), + bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"), + bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"), + ] + + # By default, the table should have the c-twitter schema because it is lexicographically last + # in the `SOURCE_URIs` list: + # a-twitter schema: (username, tweet, timestamp, likes) + # b-twitter schema: (username, tweet, timestamp) + # c-twitter schema: (username, tweet) + + # Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema + + # Create external data configuration + external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.AVRO) + external_config.source_uris = SOURCE_URIS_AVRO + external_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_AVRO + + table = bigquery.Table(table_ref) + table.external_data_configuration = external_config + + table = client.create_table(table) + + # Get table created by the create_table API call + generated_table = client.get_table(table_ref) + + self.assertEqual(generated_table.schema, expected_schema) + self.assertEqual( + generated_table.external_data_configuration._properties[ + "referenceFileSchemaUri" + ], + REFERENCE_FILE_SCHEMA_URI_AVRO, + ) + + # Clean up test + self.to_delete.insert(0, generated_table) + + def test_load_table_from_uri_with_reference_file_schema_uri_avro(self): + dataset_id = _make_dataset_id("test_reference_file_avro") + self.temp_dataset(dataset_id) + client = Config.CLIENT + dataset_ref = bigquery.DatasetReference(client.project, dataset_id) + table_id = "test_ref_file_avro" + table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id) + + expected_schema = [ + bigquery.SchemaField("username", "STRING", mode="NULLABLE"), + bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"), + bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"), + bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"), + ] + + # By default, the table should have the c-twitter schema because it is lexicographically last + # in the `SOURCE_URIS` list: + # a-twitter schema: (username, tweet, timestamp, likes) + # b-twitter schema: (username, tweet, timestamp) + # c-twitter schema: (username, tweet) + + # Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema + + # Create load job configuration + load_job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.AVRO + ) + load_job_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_AVRO + + load_job = client.load_table_from_uri( + source_uris=SOURCE_URIS_AVRO, + destination=table_ref, + job_config=load_job_config, + ) + # Wait for load job to complete + result = load_job.result() + + # Get table created by the load job + generated_table = client.get_table(table_ref) + self.assertEqual(generated_table.schema, expected_schema) + self.assertEqual( + result._properties["configuration"]["load"]["referenceFileSchemaUri"], + REFERENCE_FILE_SCHEMA_URI_AVRO, + ) + + # Clean up test + self.to_delete.insert(0, generated_table) + + def test_create_external_table_with_reference_file_schema_uri_parquet(self): + client = Config.CLIENT + dataset_id = _make_dataset_id("external_table_ref_file_parquet") + self.temp_dataset(dataset_id) + dataset_ref = bigquery.DatasetReference(client.project, dataset_id) + table_id = "test_ref_file_parquet" + table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id) + + expected_schema = [ + bigquery.SchemaField("username", "STRING", mode="NULLABLE"), + bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"), + bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"), + bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"), + ] + + # By default, the table should have the c-twitter schema because it is lexicographically last + # in the `SOURCE_URIS` list: + # a-twitter schema: (username, tweet, timestamp, likes) + # b-twitter schema: (username, tweet, timestamp) + # c-twitter schema: (username, tweet) + + # Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema + + # Create external data configuration + external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET) + external_config.source_uris = SOURCE_URIS_PARQUET + external_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_PARQUET + + table = bigquery.Table(table_ref) + table.external_data_configuration = external_config + + table = client.create_table(table) + + # Get table created by the create_table API call + generated_table = client.get_table(table_ref) + self.assertEqual(generated_table.schema, expected_schema) + self.assertEqual( + generated_table.external_data_configuration._properties[ + "referenceFileSchemaUri" + ], + REFERENCE_FILE_SCHEMA_URI_PARQUET, + ) + + # Clean up test + self.to_delete.insert(0, generated_table) + + def test_load_table_from_uri_with_reference_file_schema_uri_parquet(self): + dataset_id = _make_dataset_id("test_reference_file_parquet") + self.temp_dataset(dataset_id) + client = Config.CLIENT + dataset_ref = bigquery.DatasetReference(client.project, dataset_id) + table_id = "test_ref_file_parquet" + table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id) + + expected_schema = [ + bigquery.SchemaField("username", "STRING", mode="NULLABLE"), + bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"), + bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"), + bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"), + ] + + # By default, the table should have the c-twitter schema because it is lexicographically last + # in the `SOURCE_URIS` list: + # a-twitter schema: (username, tweet, timestamp, likes) + # b-twitter schema: (username, tweet, timestamp) + # c-twitter schema: (username, tweet) + + # Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema + + # Create load job configuration + load_job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET + ) + load_job_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_PARQUET + + load_job = client.load_table_from_uri( + source_uris=SOURCE_URIS_PARQUET, + destination=table_ref, + job_config=load_job_config, + ) + # Wait for load job to complete + result = load_job.result() + + # Get table created by the load job + generated_table = client.get_table(table_ref) + self.assertEqual(generated_table.schema, expected_schema) + self.assertEqual( + result._properties["configuration"]["load"]["referenceFileSchemaUri"], + REFERENCE_FILE_SCHEMA_URI_PARQUET, + ) + + # Clean up test + self.to_delete.insert(0, generated_table) + def _write_csv_to_storage(self, bucket_name, blob_name, header_row, data_rows): from google.cloud._testing import _NamedTemporaryFile diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index f0525c22a..ed0dc731b 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -943,7 +943,6 @@ def test_result_default_wo_state(self): conn = make_connection( _make_retriable_exception(), begun_job_resource, - _make_retriable_exception(), done_job_resource, ) client = _make_client(project=self.PROJECT, connection=conn) @@ -963,9 +962,7 @@ def test_result_default_wo_state(self): query_params={"location": "US"}, timeout=None, ) - conn.api_request.assert_has_calls( - [begin_call, begin_call, reload_call, reload_call] - ) + conn.api_request.assert_has_calls([begin_call, begin_call, reload_call]) def test_result_w_retry_wo_state(self): begun_job_resource = _make_job_resource( diff --git a/tests/unit/job/test_load.py b/tests/unit/job/test_load.py index cf2096b8b..143e1da59 100644 --- a/tests/unit/job/test_load.py +++ b/tests/unit/job/test_load.py @@ -37,6 +37,7 @@ def _setUpConstants(self): self.INPUT_BYTES = 12345 self.OUTPUT_BYTES = 23456 self.OUTPUT_ROWS = 345 + self.REFERENCE_FILE_SCHEMA_URI = "gs://path/to/reference" def _make_resource(self, started=False, ended=False): resource = super(TestLoadJob, self)._make_resource(started, ended) @@ -47,6 +48,7 @@ def _make_resource(self, started=False, ended=False): "datasetId": self.DS_ID, "tableId": self.TABLE_ID, } + config["referenceFileSchemaUri"] = self.REFERENCE_FILE_SCHEMA_URI if ended: resource["status"] = {"state": "DONE"} @@ -136,6 +138,12 @@ def _verifyResourceProperties(self, job, resource): self.assertEqual(str(job.skip_leading_rows), config["skipLeadingRows"]) else: self.assertIsNone(job.skip_leading_rows) + if "referenceFileSchemaUri" in config: + self.assertEqual( + job.reference_file_schema_uri, config["referenceFileSchemaUri"] + ) + else: + self.assertIsNone(job.reference_file_schema_uri) if "destinationEncryptionConfiguration" in config: self.assertIsNotNone(job.destination_encryption_configuration) @@ -186,6 +194,7 @@ def test_ctor(self): self.assertIsNone(job.use_avro_logical_types) self.assertIsNone(job.clustering_fields) self.assertIsNone(job.schema_update_options) + self.assertIsNone(job.reference_file_schema_uri) def test_ctor_w_config(self): from google.cloud.bigquery.schema import SchemaField @@ -461,6 +470,7 @@ def test_begin_w_bound_client(self): "datasetId": self.DS_ID, "tableId": self.TABLE_ID, }, + "referenceFileSchemaUri": self.REFERENCE_FILE_SCHEMA_URI, } }, }, @@ -503,6 +513,7 @@ def test_begin_w_autodetect(self): "datasetId": self.DS_ID, "tableId": self.TABLE_ID, }, + "referenceFileSchemaUri": self.REFERENCE_FILE_SCHEMA_URI, "autodetect": True, } }, @@ -585,6 +596,7 @@ def test_begin_w_alternate_client(self): config.use_avro_logical_types = True config.write_disposition = WriteDisposition.WRITE_TRUNCATE config.schema_update_options = [SchemaUpdateOption.ALLOW_FIELD_ADDITION] + config.reference_file_schema_uri = "gs://path/to/reference" with mock.patch( "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" ) as final_attributes: diff --git a/tests/unit/test_external_config.py b/tests/unit/test_external_config.py index 3ef61d738..72fe2761a 100644 --- a/tests/unit/test_external_config.py +++ b/tests/unit/test_external_config.py @@ -99,6 +99,12 @@ def test_connection_id(self): ec.connection_id = "path/to/connection" self.assertEqual(ec.connection_id, "path/to/connection") + def test_reference_file_schema_uri(self): + ec = external_config.ExternalConfig("") + self.assertIsNone(ec.reference_file_schema_uri) + ec.reference_file_schema_uri = "path/to/reference" + self.assertEqual(ec.reference_file_schema_uri, "path/to/reference") + def test_schema_None(self): ec = external_config.ExternalConfig("") ec.schema = None