From 7b8c588b043208b068c97b92a7aa5853787a9def Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Fri, 13 Oct 2017 16:31:12 -0400 Subject: [PATCH] bigquery: support external table definitions for query jobs Also, set ExternalConfig.options based on source_format, and make read-only. Also, change from_api_repr functions in external_config.py so that they don't modify their resource argument. This simplifies tests. --- bigquery/google/cloud/bigquery/_helpers.py | 8 + .../google/cloud/bigquery/external_config.py | 233 +++++++++--------- bigquery/google/cloud/bigquery/job.py | 41 +-- bigquery/nox.py | 2 +- bigquery/tests/system.py | 126 +++++----- bigquery/tests/unit/test_external_config.py | 36 +-- bigquery/tests/unit/test_job.py | 121 +++++++-- 7 files changed, 328 insertions(+), 239 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index d4230f9ff4f60..2d763109e7453 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -546,3 +546,11 @@ def _should_retry(exc): on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. """ + + +def _int_or_none(value): + """Helper: deserialize int value from JSON string.""" + if isinstance(value, int): + return value + if value is not None: + return int(value) diff --git a/bigquery/google/cloud/bigquery/external_config.py b/bigquery/google/cloud/bigquery/external_config.py index 9177595da67ca..e3560224008c9 100644 --- a/bigquery/google/cloud/bigquery/external_config.py +++ b/bigquery/google/cloud/bigquery/external_config.py @@ -29,122 +29,10 @@ from google.cloud.bigquery._helpers import _bytes_to_json from google.cloud.bigquery._helpers import _TypedApiResourceProperty from google.cloud.bigquery._helpers import _ListApiResourceProperty +from google.cloud.bigquery._helpers import _int_or_none from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import _build_schema_resource from google.cloud.bigquery.table import _parse_schema_resource -from google.cloud.bigquery.job import _int_or_none - - -class ExternalConfig(object): - """Description of an external data source. - - :type source_format: str - :param source_format: the format of the external data. See - the ``source_format`` property on this class. - """ - - def __init__(self, source_format): - self._properties = {'sourceFormat': source_format} - self._options = None - - @property - def source_format(self): - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat - """ - return self._properties['sourceFormat'] - - autodetect = _TypedApiResourceProperty( - 'autodetect', 'autodetect', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).autodetect - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.autodetect - """ - - compression = _TypedApiResourceProperty( - 'compression', 'compression', six.string_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).compression - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.compression - """ - - ignore_unknown_values = _TypedApiResourceProperty( - 'ignore_unknown_values', 'ignoreUnknownValues', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).ignoreUnknownValues - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.ignoreUnknownValues - """ - - max_bad_records = _TypedApiResourceProperty( - 'max_bad_records', 'maxBadRecords', six.integer_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).maxBadRecords - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.maxBadRecords - """ - - source_uris = _ListApiResourceProperty( - 'source_uris', 'sourceUris', six.string_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceUris - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceUris - """ - - schema = _ListApiResourceProperty('schema', 'schema', SchemaField) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).schema - https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.schema - """ - - @property - def options(self): - """Source-specific options. A subclass of ExternalConfigOptions.""" - return self._options - - @options.setter - def options(self, value): - if self.source_format != value._SOURCE_FORMAT: - raise ValueError( - 'source format %s does not match option type %s' % ( - self.source_format, value.__class__.__name__)) - self._options = value - - def to_api_repr(self): - """Build an API representation of this object. - - :rtype: dict - :returns: A dictionary in the format used by the BigQuery API. - """ - config = copy.deepcopy(self._properties) - if self.schema: - config['schema'] = {'fields': _build_schema_resource(self.schema)} - if self.options is not None: - config[self.options._RESOURCE_NAME] = self.options.to_api_repr() - return config - - @classmethod - def from_api_repr(cls, resource): - """Factory: construct a CSVOptions given its API representation - - :type resource: dict - :param resource: - An extract job configuration in the same representation as is - returned from the API. - - :rtype: :class:`google.cloud.bigquery.external_config.CSVOptions` - :returns: Configuration parsed from ``resource``. - """ - config = cls(resource['sourceFormat']) - schema = resource.pop('schema', None) - for optcls in (BigtableOptions, CSVOptions, GoogleSheetsOptions): - opts = resource.pop(optcls._RESOURCE_NAME, None) - if opts is not None: - config.options = optcls.from_api_repr(opts) - break - config._properties = copy.deepcopy(resource) - if schema: - config.schema = _parse_schema_resource(schema) - return config class BigtableColumn(object): @@ -220,9 +108,9 @@ def from_api_repr(cls, resource): :rtype: :class:`google.cloud.bigquery.external_config.BigtableColumn` :returns: Configuration parsed from ``resource``. """ - qe = resource.pop('qualifierEncoded', None) config = cls() config._properties = copy.deepcopy(resource) + qe = resource.get('qualifierEncoded') if qe: config.qualifier_encoded = base64.standard_b64decode(_to_bytes(qe)) return config @@ -436,7 +324,7 @@ def from_api_repr(cls, resource): :rtype: :class:`google.cloud.bigquery.external_config.CSVOptions` :returns: Configuration parsed from ``resource``. """ - slr = resource.pop('skipLeadingRows', None) + slr = resource.get('skipLeadingRows') config = cls() config._properties = copy.deepcopy(resource) config.skip_leading_rows = _int_or_none(slr) @@ -484,8 +372,121 @@ def from_api_repr(cls, resource): :class:`google.cloud.bigquery.external_config.GoogleSheetsOptions` :returns: Configuration parsed from ``resource``. """ - slr = resource.pop('skipLeadingRows', None) + slr = resource.get('skipLeadingRows') config = cls() config._properties = copy.deepcopy(resource) config.skip_leading_rows = _int_or_none(slr) return config + + +_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions) + + +class ExternalConfig(object): + """Description of an external data source. + + :type source_format: str + :param source_format: the format of the external data. See + the ``source_format`` property on this class. + """ + + def __init__(self, source_format): + self._properties = {'sourceFormat': source_format} + self._options = None + for optcls in _OPTION_CLASSES: + if source_format == optcls._SOURCE_FORMAT: + self._options = optcls() + break + + @property + def source_format(self): + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat + """ + return self._properties['sourceFormat'] + + @property + def options(self): + """Source-specific options.""" + return self._options + + autodetect = _TypedApiResourceProperty( + 'autodetect', 'autodetect', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).autodetect + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.autodetect + """ + + compression = _TypedApiResourceProperty( + 'compression', 'compression', six.string_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).compression + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.compression + """ + + ignore_unknown_values = _TypedApiResourceProperty( + 'ignore_unknown_values', 'ignoreUnknownValues', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).ignoreUnknownValues + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.ignoreUnknownValues + """ + + max_bad_records = _TypedApiResourceProperty( + 'max_bad_records', 'maxBadRecords', six.integer_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).maxBadRecords + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.maxBadRecords + """ + + source_uris = _ListApiResourceProperty( + 'source_uris', 'sourceUris', six.string_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceUris + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceUris + """ + + schema = _ListApiResourceProperty('schema', 'schema', SchemaField) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).schema + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.schema + """ + + def to_api_repr(self): + """Build an API representation of this object. + + :rtype: dict + :returns: A dictionary in the format used by the BigQuery API. + """ + config = copy.deepcopy(self._properties) + if self.schema: + config['schema'] = {'fields': _build_schema_resource(self.schema)} + if self.options is not None: + r = self.options.to_api_repr() + if r != {}: + config[self.options._RESOURCE_NAME] = r + return config + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct a CSVOptions given its API representation + + :type resource: dict + :param resource: + An extract job configuration in the same representation as is + returned from the API. + + :rtype: :class:`google.cloud.bigquery.external_config.CSVOptions` + :returns: Configuration parsed from ``resource``. + """ + config = cls(resource['sourceFormat']) + schema = resource.get('schema') + for optcls in _OPTION_CLASSES: + opts = resource.get(optcls._RESOURCE_NAME) + if opts is not None: + config._options = optcls.from_api_repr(opts) + break + config._properties = copy.deepcopy(resource) + if schema: + config.schema = _parse_schema_resource(schema) + return config diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 65da699563699..e30f6143cf5bc 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -25,6 +25,7 @@ from google.cloud.exceptions import NotFound from google.cloud._helpers import _datetime_from_microseconds from google.cloud.bigquery.dataset import DatasetReference +from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.query import _AbstractQueryParameter from google.cloud.bigquery.query import _query_param_from_api_repr from google.cloud.bigquery.query import ArrayQueryParameter @@ -39,6 +40,7 @@ from google.cloud.bigquery._helpers import _ListApiResourceProperty from google.cloud.bigquery._helpers import _TypedApiResourceProperty from google.cloud.bigquery._helpers import DEFAULT_RETRY +from google.cloud.bigquery._helpers import _int_or_none _DONE_STATE = 'DONE' _STOPPED_REASON = 'stopped' @@ -65,22 +67,6 @@ } -def _bool_or_none(value): - """Helper: deserialize boolean value from JSON string.""" - if isinstance(value, bool): - return value - if value is not None: - return value.lower() in ['t', 'true', '1'] - - -def _int_or_none(value): - """Helper: deserialize int value from JSON string.""" - if isinstance(value, int): - return value - if value is not None: - return int(value) - - def _error_result_to_exception(error_result): """Maps BigQuery error reasons to an exception. @@ -1315,6 +1301,14 @@ def _to_api_repr_udf_resources(value): ] +def _from_api_repr_table_defs(resource): + return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()} + + +def _to_api_repr_table_defs(value): + return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()} + + class QueryJobConfig(object): """Configuration options for query jobs. @@ -1469,6 +1463,12 @@ def from_api_repr(cls, resource): https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition """ + table_definitions = _TypedApiResourceProperty( + 'table_definitions', 'tableDefinitions', dict) + """See + https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions + """ + _maximum_billing_tier = None _maximum_bytes_billed = None @@ -1478,6 +1478,8 @@ def from_api_repr(cls, resource): 'destinationTable': ( TableReference.from_api_repr, TableReference.to_api_repr), 'maximumBytesBilled': (int, str), + 'tableDefinitions': (_from_api_repr_table_defs, + _to_api_repr_table_defs), _QUERY_PARAMETERS_KEY: ( _from_api_repr_query_parameters, _to_api_repr_query_parameters), _UDF_RESOURCES_KEY: ( @@ -1615,6 +1617,13 @@ def maximum_bytes_billed(self): """ return self._configuration.maximum_bytes_billed + @property + def table_definitions(self): + """See + :class:`~google.cloud.bigquery.job.QueryJobConfig.table_definitions`. + """ + return self._configuration.table_definitions + def _build_resource(self): """Generate a resource for :meth:`begin`.""" configuration = self._configuration.to_api_repr() diff --git a/bigquery/nox.py b/bigquery/nox.py index 23b2771bd523c..2dd4fb431e665 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -76,7 +76,7 @@ def system_tests(session, python_version): os.path.join('..', 'storage'), os.path.join('..', 'test_utils'), ) - session.install('.') + session.install('-e', '.') # Run py.test against the system tests. session.run( diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 2c62e5efa9b06..9016444b47a6d 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -436,47 +436,23 @@ def test_load_table_from_local_avro_file_then_dump_table(self): sorted(ROWS, key=by_wavelength)) def test_load_table_from_storage_then_dump_table(self): - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.storage import Client as StorageClient - - local_id = unique_resource_id() - BUCKET_NAME = 'bq_load_test' + local_id - BLOB_NAME = 'person_ages.csv' - GS_URL = 'gs://%s/%s' % (BUCKET_NAME, BLOB_NAME) + TABLE_ID = 'test_table' ROWS = [ ('Phred Phlyntstone', 32), ('Bharney Rhubble', 33), ('Wylma Phlyntstone', 29), ('Bhettye Rhubble', 27), ] - TABLE_NAME = 'test_table' - - storage_client = StorageClient() - - # In the **very** rare case the bucket name is reserved, this - # fails with a ConnectionError. - bucket = storage_client.create_bucket(BUCKET_NAME) - self.to_delete.append(bucket) - - blob = bucket.blob(BLOB_NAME) - - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as csv_write: - writer = csv.writer(csv_write) - writer.writerow(('Full Name', 'Age')) - writer.writerows(ROWS) - - with open(temp.name, 'rb') as csv_read: - blob.upload_from_file(csv_read, content_type='text/csv') - - self.to_delete.insert(0, blob) + GS_URL = self._write_csv_to_storage( + 'bq_load_test' + unique_resource_id(), 'person_ages.csv', + ('Full Name', 'Age'), ROWS) dataset = self.temp_dataset(_make_dataset_id('load_gcs_then_dump')) full_name = bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED') age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED') - table_arg = Table(dataset.table(TABLE_NAME), schema=[full_name, age]) + table_arg = Table(dataset.table(TABLE_ID), schema=[full_name, age]) table = retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) @@ -486,7 +462,7 @@ def test_load_table_from_storage_then_dump_table(self): config.source_format = 'CSV' config.write_disposition = 'WRITE_EMPTY' job = Config.CLIENT.load_table_from_storage( - GS_URL, dataset.table(TABLE_NAME), job_config=config) + GS_URL, dataset.table(TABLE_ID), job_config=config) # Allow for 90 seconds of "warm up" before rows visible. See # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability @@ -501,44 +477,19 @@ def test_load_table_from_storage_then_dump_table(self): sorted(ROWS, key=by_age)) def test_load_table_from_storage_w_autodetect_schema(self): - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.storage import Client as StorageClient from google.cloud.bigquery import SchemaField - local_id = unique_resource_id() - bucket_name = 'bq_load_test' + local_id - blob_name = 'person_ages.csv' - gs_url = 'gs://{}/{}'.format(bucket_name, blob_name) rows = [ ('Phred Phlyntstone', 32), ('Bharney Rhubble', 33), ('Wylma Phlyntstone', 29), ('Bhettye Rhubble', 27), ] * 100 # BigQuery internally uses the first 100 rows to detect schema - table_name = 'test_table' - - storage_client = StorageClient() - - # In the **very** rare case the bucket name is reserved, this - # fails with a ConnectionError. - bucket = storage_client.create_bucket(bucket_name) - self.to_delete.append(bucket) - - blob = bucket.blob(blob_name) - - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as csv_write: - writer = csv.writer(csv_write) - writer.writerow(('Full Name', 'Age')) - writer.writerows(rows) - - with open(temp.name, 'rb') as csv_read: - blob.upload_from_file(csv_read, content_type='text/csv') - - self.to_delete.insert(0, blob) - + gs_url = self._write_csv_to_storage( + 'bq_load_test' + unique_resource_id(), 'person_ages.csv', + ('Full Name', 'Age'), rows) dataset = self.temp_dataset(_make_dataset_id('load_gcs_then_dump')) - table_ref = dataset.table(table_name) + table_ref = dataset.table('test_table') config = bigquery.LoadJobConfig() config.autodetect = True @@ -564,6 +515,33 @@ def test_load_table_from_storage_w_autodetect_schema(self): self.assertEqual( sorted(actual_row_tuples, key=by_age), sorted(rows, key=by_age)) + def _write_csv_to_storage(self, bucket_name, blob_name, header_row, + data_rows): + from google.cloud._testing import _NamedTemporaryFile + from google.cloud.storage import Client as StorageClient + + storage_client = StorageClient() + + # In the **very** rare case the bucket name is reserved, this + # fails with a ConnectionError. + bucket = storage_client.create_bucket(bucket_name) + self.to_delete.append(bucket) + + blob = bucket.blob(blob_name) + + with _NamedTemporaryFile() as temp: + with open(temp.name, 'w') as csv_write: + writer = csv.writer(csv_write) + writer.writerow(header_row) + writer.writerows(data_rows) + + with open(temp.name, 'rb') as csv_read: + blob.upload_from_file(csv_read, content_type='text/csv') + + self.to_delete.insert(0, blob) + + return 'gs://{}/{}'.format(bucket_name, blob_name) + def _load_table_for_extract_table( self, storage_client, rows, bucket_name, blob_name, table): from google.cloud._testing import _NamedTemporaryFile @@ -1271,6 +1249,36 @@ def test_query_future(self): row_tuples = [r.values() for r in iterator] self.assertEqual(row_tuples, [(1,)]) + def test_query_table_def(self): + rows = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] + gs_url = self._write_csv_to_storage( + 'bq_load_test' + unique_resource_id(), 'person_ages.csv', + ('Full Name', 'Age'), rows) + + job_config = bigquery.QueryJobConfig() + table_id = 'flintstones' + ec = bigquery.ExternalConfig('CSV') + ec.source_uris = [gs_url] + ec.schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + ec.options.skip_leading_rows = 1 # skip the header row + job_config.table_definitions = {table_id: ec} + sql = 'SELECT * from %s' % table_id + + got_rows = Config.CLIENT.query_rows(sql, job_config=job_config) + + row_tuples = [r.values() for r in got_rows] + by_age = operator.itemgetter(1) + self.assertEqual(sorted(row_tuples, key=by_age), + sorted(rows, key=by_age)) + def test_create_rows_nested_nested(self): # See #2951 SF = bigquery.SchemaField diff --git a/bigquery/tests/unit/test_external_config.py b/bigquery/tests/unit/test_external_config.py index 6768093ed0b3c..b7887428606d9 100644 --- a/bigquery/tests/unit/test_external_config.py +++ b/bigquery/tests/unit/test_external_config.py @@ -55,7 +55,6 @@ def test_api_repr_base(self): ], }, }) - want_resource = copy.deepcopy(resource) ec = ExternalConfig.from_api_repr(resource) self._verify_base(ec) self.assertEqual(ec.schema, @@ -63,7 +62,7 @@ def test_api_repr_base(self): self.assertIsNone(ec.options) got_resource = ec.to_api_repr() - self.assertEqual(got_resource, want_resource) + self.assertEqual(got_resource, resource) def _verify_base(self, ec): self.assertEqual(ec.autodetect, True) @@ -85,7 +84,6 @@ def test_api_repr_sheets(self): 'sourceFormat': 'GOOGLE_SHEETS', 'googleSheetsOptions': {'skipLeadingRows': '123'}, }) - want_resource = copy.deepcopy(resource) ec = ExternalConfig.from_api_repr(resource) @@ -96,13 +94,13 @@ def test_api_repr_sheets(self): got_resource = ec.to_api_repr() - self.assertEqual(got_resource, want_resource) + self.assertEqual(got_resource, resource) - del want_resource['googleSheetsOptions']['skipLeadingRows'] - ec = ExternalConfig.from_api_repr(copy.deepcopy(want_resource)) + del resource['googleSheetsOptions']['skipLeadingRows'] + ec = ExternalConfig.from_api_repr(resource) self.assertIsNone(ec.options.skip_leading_rows) got_resource = ec.to_api_repr() - self.assertEqual(got_resource, want_resource) + self.assertEqual(got_resource, resource) def test_api_repr_csv(self): from google.cloud.bigquery.external_config import CSVOptions @@ -118,7 +116,6 @@ def test_api_repr_csv(self): 'encoding': 'encoding', }, }) - want_resource = copy.deepcopy(resource) ec = ExternalConfig.from_api_repr(resource) @@ -134,13 +131,13 @@ def test_api_repr_csv(self): got_resource = ec.to_api_repr() - self.assertEqual(got_resource, want_resource) + self.assertEqual(got_resource, resource) - del want_resource['csvOptions']['skipLeadingRows'] - ec = ExternalConfig.from_api_repr(copy.deepcopy(want_resource)) + del resource['csvOptions']['skipLeadingRows'] + ec = ExternalConfig.from_api_repr(resource) self.assertIsNone(ec.options.skip_leading_rows) got_resource = ec.to_api_repr() - self.assertEqual(got_resource, want_resource) + self.assertEqual(got_resource, resource) def test_api_repr_bigtable(self): from google.cloud.bigquery.external_config import BigtableOptions @@ -178,7 +175,6 @@ def test_api_repr_bigtable(self): ], }, }) - want_resource = copy.deepcopy(resource) ec = ExternalConfig.from_api_repr(resource) @@ -207,19 +203,7 @@ def test_api_repr_bigtable(self): got_resource = ec.to_api_repr() - self.assertEqual(got_resource, want_resource) - - def test_option_mismatch(self): - from google.cloud.bigquery.external_config import CSVOptions - from google.cloud.bigquery.external_config import BigtableOptions - from google.cloud.bigquery.external_config import GoogleSheetsOptions - - for source_format, opts in (('BIGTABLE', CSVOptions()), - ('CSV', GoogleSheetsOptions()), - ('GOOGLE_SHEETS', BigtableOptions())): - ec = ExternalConfig(source_format) - with self.assertRaises(ValueError): - ec.options = opts + self.assertEqual(got_resource, resource) def _copy_and_update(d, u): diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index c1c1903289685..0e0b667e704d7 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -42,27 +42,6 @@ def _make_client(project='test-project', connection=None): return client -class Test__bool_or_none(unittest.TestCase): - - def _call_fut(self, *args, **kwargs): - from google.cloud.bigquery import job - - return job._bool_or_none(*args, **kwargs) - - def test_w_bool(self): - self.assertTrue(self._call_fut(True)) - self.assertFalse(self._call_fut(False)) - - def test_w_none(self): - self.assertIsNone(self._call_fut(None)) - - def test_w_str(self): - self.assertTrue(self._call_fut('1')) - self.assertTrue(self._call_fut('t')) - self.assertTrue(self._call_fut('true')) - self.assertFalse(self._call_fut('anything else')) - - class Test__int_or_none(unittest.TestCase): def _call_fut(self, *args, **kwargs): @@ -1673,6 +1652,17 @@ def _verifyQueryParameters(self, job, config): for found, expected in zip(job.query_parameters, query_parameters): self.assertEqual(found.to_api_repr(), expected) + def _verify_table_definitions(self, job, config): + table_defs = config.get('tableDefinitions') + if job.table_definitions is None: + self.assertIsNone(table_defs) + else: + self.assertEqual(len(job.table_definitions), len(table_defs)) + for found_key, found_ec in job.table_definitions.items(): + expected_ec = table_defs.get(found_key) + self.assertIsNotNone(expected_ec) + self.assertEqual(found_ec.to_api_repr(), expected_ec) + def _verify_configuration_properties(self, job, configuration): if 'dryRun' in configuration: self.assertEqual(job.dry_run, @@ -1691,6 +1681,7 @@ def _verifyResourceProperties(self, job, resource): self._verifyIntegerResourceProperties(job, query_config) self._verify_udf_resources(job, query_config) self._verifyQueryParameters(job, query_config) + self._verify_table_definitions(job, query_config) self.assertEqual(job.query, query_config['query']) if 'createDisposition' in query_config: @@ -1754,6 +1745,7 @@ def test_ctor_defaults(self): self.assertIsNone(job.write_disposition) self.assertIsNone(job.maximum_billing_tier) self.assertIsNone(job.maximum_bytes_billed) + self.assertIsNone(job.table_definitions) def test_ctor_w_udf_resources(self): from google.cloud.bigquery.job import QueryJobConfig @@ -2516,6 +2508,93 @@ def test_begin_w_positional_query_parameter(self): self._verifyResourceProperties(job, RESOURCE) self.assertEqual(req['data'], SENT) + def test_begin_w_table_defs(self): + from google.cloud.bigquery.job import QueryJobConfig + from google.cloud.bigquery.external_config import ExternalConfig + from google.cloud.bigquery.external_config import BigtableColumn + from google.cloud.bigquery.external_config import BigtableColumnFamily + + PATH = '/projects/%s/jobs' % (self.PROJECT,) + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + + bt_config = ExternalConfig('BIGTABLE') + bt_config.ignore_unknown_values = True + bt_config.options.read_rowkey_as_string = True + cf = BigtableColumnFamily() + cf.family_id = 'cf' + col = BigtableColumn() + col.field_name = 'fn' + cf.columns = [col] + bt_config.options.column_families = [cf] + BT_CONFIG_RESOURCE = { + 'sourceFormat': 'BIGTABLE', + 'ignoreUnknownValues': True, + 'bigtableOptions': { + 'readRowkeyAsString': True, + 'columnFamilies': [{ + 'familyId': 'cf', + 'columns': [{'fieldName': 'fn'}], + }], + }, + } + CSV_CONFIG_RESOURCE = { + 'sourceFormat': 'CSV', + 'maxBadRecords': 8, + 'csvOptions': { + 'allowJaggedRows': True, + }, + } + csv_config = ExternalConfig('CSV') + csv_config.max_bad_records = 8 + csv_config.options.allow_jagged_rows = True + bt_table = 'bigtable-table' + csv_table = 'csv-table' + RESOURCE['configuration']['query']['tableDefinitions'] = { + bt_table: BT_CONFIG_RESOURCE, + csv_table: CSV_CONFIG_RESOURCE, + } + want_resource = copy.deepcopy(RESOURCE) + conn = _Connection(RESOURCE) + client = _make_client(project=self.PROJECT, connection=conn) + config = QueryJobConfig() + config.table_definitions = { + bt_table: bt_config, + csv_table: csv_config, + } + config.use_legacy_sql = True + job = self._make_one( + self.JOB_ID, self.QUERY, client, job_config=config) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_ID, + }, + 'configuration': { + 'query': { + 'query': self.QUERY, + 'useLegacySql': True, + 'tableDefinitions': { + bt_table: BT_CONFIG_RESOURCE, + csv_table: CSV_CONFIG_RESOURCE, + }, + }, + }, + } + self._verifyResourceProperties(job, want_resource) + self.assertEqual(req['data'], SENT) + def test_dry_run_query(self): from google.cloud.bigquery.job import QueryJobConfig