From 3284a0d38c10a4da9d4a34d7be2ecfd2f67ab7a9 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 19 Sep 2017 14:02:02 -0700 Subject: [PATCH] BQ: client.extract_table starts extract job (#3991) * BQ: client.extract_table starts extract job Add system tests for extract_table. * BigQuery: client.extract_table use `**kwargs` for Python 2.7. * BQ: extract_table. Use dict.get for kwargs. job_id instead of job_name. --- bigquery/google/cloud/bigquery/__init__.py | 2 + bigquery/google/cloud/bigquery/_helpers.py | 76 +++++++++ bigquery/google/cloud/bigquery/client.py | 39 +++-- bigquery/google/cloud/bigquery/job.py | 186 ++++++++++++++------- bigquery/tests/system.py | 101 +++++++++++ bigquery/tests/unit/test__helpers.py | 43 +++++ bigquery/tests/unit/test_client.py | 88 +++++++++- bigquery/tests/unit/test_job.py | 49 +++--- 8 files changed, 484 insertions(+), 100 deletions(-) diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 00fa4445b0d0..333854035376 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -32,6 +32,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery.dataset import AccessEntry from google.cloud.bigquery.dataset import Dataset +from google.cloud.bigquery.job import ExtractJobConfig from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table @@ -41,6 +42,7 @@ 'ArrayQueryParameter', 'Client', 'Dataset', + 'ExtractJobConfig', 'ScalarQueryParameter', 'SchemaField', 'StructQueryParameter', diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 1ae0132e4480..ebb2fb77ad6c 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -299,6 +299,82 @@ def _time_to_json(value): _SCALAR_VALUE_TO_JSON_PARAM['TIMESTAMP'] = _timestamp_to_json_parameter +class _ApiResourceProperty(object): + """Base property implementation. + + Values will be stored on a `_properties` helper attribute of the + property's job instance. + + :type name: str + :param name: name of the property + + :type resource_name: str + :param resource_name: name of the property in the resource dictionary + """ + + def __init__(self, name, resource_name): + self.name = name + self.resource_name = resource_name + + def __get__(self, instance, owner): + """Descriptor protocol: accessor""" + if instance is None: + return self + return instance._properties.get(self.resource_name) + + def _validate(self, value): + """Subclasses override to impose validation policy.""" + pass + + def __set__(self, instance, value): + """Descriptor protocol: mutator""" + self._validate(value) + instance._properties[self.resource_name] = value + + def __delete__(self, instance): + """Descriptor protocol: deleter""" + del instance._properties[self.resource_name] + + +class _TypedApiResourceProperty(_ApiResourceProperty): + """Property implementation: validates based on value type. + + :type name: str + :param name: name of the property + + :type resource_name: str + :param resource_name: name of the property in the resource dictionary + + :type property_type: type or sequence of types + :param property_type: type to be validated + """ + def __init__(self, name, resource_name, property_type): + super(_TypedApiResourceProperty, self).__init__( + name, resource_name) + self.property_type = property_type + + def _validate(self, value): + """Ensure that 'value' is of the appropriate type. + + :raises: ValueError on a type mismatch. + """ + if value is None: + return + if not isinstance(value, self.property_type): + raise ValueError('Required type: %s' % (self.property_type,)) + + +class _EnumApiResourceProperty(_ApiResourceProperty): + """Pseudo-enumeration class. + + :type name: str + :param name: name of the property. + + :type resource_name: str + :param resource_name: name of the property in the resource dictionary + """ + + class _ConfigurationProperty(object): """Base property implementation. diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 1b9e9a522a15..05be0da8123d 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -16,6 +16,8 @@ from __future__ import absolute_import +import uuid + from google.api.core import page_iterator from google.cloud.client import ClientWithProject from google.cloud.bigquery._http import Connection @@ -385,27 +387,44 @@ def copy_table(self, job_id, destination, *sources): """ return CopyJob(job_id, destination, sources, client=self) - def extract_table_to_storage(self, job_id, source, *destination_uris): - """Construct a job for extracting a table into Cloud Storage files. + def extract_table(self, source, *destination_uris, **kwargs): + """Start a job to extract a table into Cloud Storage files. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract - :type job_id: str - :param job_id: Name of the job. - - :type source: :class:`google.cloud.bigquery.table.Table` + :type source: :class:`google.cloud.bigquery.table.TableReference` :param source: table to be extracted. :type destination_uris: sequence of string - :param destination_uris: URIs of CloudStorage file(s) into which - table data is to be extracted; in format - ``gs:///``. + :param destination_uris: + URIs of Cloud Storage file(s) into which table data is to be + extracted; in format ``gs:///``. + + :type kwargs: dict + :param kwargs: Additional keyword arguments. + + :Keyword Arguments: + * *job_config* + (:class:`google.cloud.bigquery.job.ExtractJobConfig`) -- + (Optional) Extra configuration options for the extract job. + * *job_id* (``str``) -- + Additional content + (Optional) The ID of the job. :rtype: :class:`google.cloud.bigquery.job.ExtractJob` :returns: a new ``ExtractJob`` instance """ - return ExtractJob(job_id, source, destination_uris, client=self) + job_config = kwargs.get('job_config') + job_id = kwargs.get('job_id') + if job_id is None: + job_id = str(uuid.uuid4()) + + job = ExtractJob( + job_id, source, list(destination_uris), client=self, + job_config=job_config) + job.begin() + return job def run_async_query(self, job_id, query, udf_resources=(), query_parameters=()): diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 5807fcd25e0b..cfc861266355 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -14,6 +14,7 @@ """Define API Jobs.""" +import copy import threading import six @@ -24,6 +25,7 @@ from google.cloud.exceptions import NotFound from google.cloud._helpers import _datetime_from_microseconds from google.cloud.bigquery.dataset import Dataset +from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TableReference @@ -35,8 +37,10 @@ from google.cloud.bigquery._helpers import StructQueryParameter from google.cloud.bigquery._helpers import UDFResource from google.cloud.bigquery._helpers import UDFResourcesProperty +from google.cloud.bigquery._helpers import _EnumApiResourceProperty from google.cloud.bigquery._helpers import _EnumProperty from google.cloud.bigquery._helpers import _query_param_from_api_repr +from google.cloud.bigquery._helpers import _TypedApiResourceProperty from google.cloud.bigquery._helpers import _TypedProperty _DONE_STATE = 'DONE' @@ -116,7 +120,7 @@ def __set__(self, instance, value): setattr(instance._configuration, self._backing_name, value) -class Compression(_EnumProperty): +class Compression(_EnumApiResourceProperty): """Pseudo-enum for ``compression`` properties.""" GZIP = 'GZIP' NONE = 'NONE' @@ -128,7 +132,7 @@ class CreateDisposition(_EnumProperty): CREATE_NEVER = 'CREATE_NEVER' -class DestinationFormat(_EnumProperty): +class DestinationFormat(_EnumApiResourceProperty): """Pseudo-enum for ``destination_format`` properties.""" CSV = 'CSV' NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' @@ -401,6 +405,7 @@ def begin(self, client=None): client = self._require_client(client) path = '/projects/%s/jobs' % (self.project,) + api_response = client._connection.api_request( method='POST', path=path, data=self._build_resource()) self._set_properties(api_response) @@ -973,62 +978,126 @@ def from_api_repr(cls, resource, client): return job -class _ExtractConfiguration(object): - """User-settable configuration options for extract jobs. +class ExtractJobConfig(object): + """Configuration options for extract jobs. - Values which are ``None`` -> server defaults. + All properties in this class are optional. Values which are ``None`` -> + server defaults. """ - _compression = None - _destination_format = None - _field_delimiter = None - _print_header = None + + def __init__(self): + self._properties = {} + + compression = Compression('compression', 'compression') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.compression + """ + + destination_format = DestinationFormat( + 'destination_format', 'destinationFormat') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.destinationFormat + """ + + field_delimiter = _TypedApiResourceProperty( + 'field_delimiter', 'fieldDelimiter', six.string_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.fieldDelimiter + """ + + print_header = _TypedApiResourceProperty( + 'print_header', 'printHeader', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.printHeader + """ + + def to_api_repr(self): + """Build an API representation of the extact job config. + + :rtype: dict + :returns: A dictionary in the format used by the BigQuery API. + """ + return copy.deepcopy(self._properties) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct a job configuration given its API representation + + :type resource: dict + :param resource: + An extract job configuration in the same representation as is + returned from the API. + + :rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig` + :returns: Configuration parsed from ``resource``. + """ + config = cls() + config._properties = copy.deepcopy(resource) + return config class ExtractJob(_AsyncJob): """Asynchronous job: extract data from a table into Cloud Storage. :type job_id: str - :param job_id: the job's ID, within the project belonging to ``client``. + :param job_id: the job's ID - :type source: :class:`google.cloud.bigquery.table.Table` + :type source: :class:`google.cloud.bigquery.table.TableReference` :param source: Table into which data is to be loaded. :type destination_uris: list of string - :param destination_uris: URIs describing Cloud Storage blobs into which - extracted data will be written, in format - ``gs:///``. + :param destination_uris: + URIs describing where the extracted data will be written in Cloud + Storage, using the format ``gs:///``. :type client: :class:`google.cloud.bigquery.client.Client` - :param client: A client which holds credentials and project configuration - for the dataset (which requires a project). + :param client: + A client which holds credentials and project configuration. + + :type job_config: :class:`~google.cloud.bigquery.job.ExtractJobConfig` + :param job_config: + (Optional) Extra configuration options for the extract job. """ _JOB_TYPE = 'extract' - def __init__(self, job_id, source, destination_uris, client): + def __init__( + self, job_id, source, destination_uris, client, job_config=None): super(ExtractJob, self).__init__(job_id, client) + + if job_config is None: + job_config = ExtractJobConfig() + self.source = source self.destination_uris = destination_uris - self._configuration = _ExtractConfiguration() + self._configuration = job_config - compression = Compression('compression') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.compression - """ + @property + def compression(self): + """See + :class:`~google.cloud.bigquery.job.ExtractJobConfig.compression`. + """ + return self._configuration.compression - destination_format = DestinationFormat('destination_format') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.destinationFormat - """ + @property + def destination_format(self): + """See + :class:`~google.cloud.bigquery.job.ExtractJobConfig.destination_format`. + """ + return self._configuration.destination_format - field_delimiter = _TypedProperty('field_delimiter', six.string_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.fieldDelimiter - """ + @property + def field_delimiter(self): + """See + :class:`~google.cloud.bigquery.job.ExtractJobConfig.field_delimiter`. + """ + return self._configuration.field_delimiter - print_header = _TypedProperty('print_header', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.printHeader - """ + @property + def print_header(self): + """See + :class:`~google.cloud.bigquery.job.ExtractJobConfig.print_header`. + """ + return self._configuration.print_header @property def destination_uri_file_counts(self): @@ -1046,50 +1115,34 @@ def destination_uri_file_counts(self): result = int(result) return result - def _populate_config_resource(self, configuration): - """Helper for _build_resource: copy config properties to resource""" - if self.compression is not None: - configuration['compression'] = self.compression - if self.destination_format is not None: - configuration['destinationFormat'] = self.destination_format - if self.field_delimiter is not None: - configuration['fieldDelimiter'] = self.field_delimiter - if self.print_header is not None: - configuration['printHeader'] = self.print_header - def _build_resource(self): """Generate a resource for :meth:`begin`.""" source_ref = { - 'projectId': self.source.project, - 'datasetId': self.source.dataset_id, + 'projectId': self.source.dataset.project, + 'datasetId': self.source.dataset.dataset_id, 'tableId': self.source.table_id, } + configuration = self._configuration.to_api_repr() + configuration['sourceTable'] = source_ref + configuration['destinationUris'] = self.destination_uris + resource = { 'jobReference': { 'projectId': self.project, 'jobId': self.job_id, }, 'configuration': { - self._JOB_TYPE: { - 'sourceTable': source_ref, - 'destinationUris': self.destination_uris, - }, + self._JOB_TYPE: configuration, }, } - configuration = resource['configuration'][self._JOB_TYPE] - self._populate_config_resource(configuration) return resource def _copy_configuration_properties(self, configuration): """Helper: assign subclass configuration properties in cleaned.""" - self.compression = configuration.get('compression') - self.destination_format = configuration.get('destinationFormat') - self.field_delimiter = configuration.get('fieldDelimiter') - self.print_header = _bool_or_none( - configuration.get('printHeader')) + self._configuration._properties = copy.deepcopy(configuration) @classmethod def from_api_repr(cls, resource, client): @@ -1110,13 +1163,16 @@ def from_api_repr(cls, resource, client): :rtype: :class:`google.cloud.bigquery.job.ExtractJob` :returns: Job parsed from ``resource``. """ - job_id, config = cls._get_resource_config(resource) - source_config = config['sourceTable'] - dataset = Dataset(source_config['datasetId'], client) - table_ref = TableReference(dataset, source_config['tableId']) - source = Table(table_ref, client=client) - destination_uris = config['destinationUris'] - job = cls(job_id, source, destination_uris, client=client) + job_id, config_resource = cls._get_resource_config(resource) + config = ExtractJobConfig.from_api_repr(config_resource) + source_config = config_resource['sourceTable'] + dataset = DatasetReference( + source_config['projectId'], source_config['datasetId']) + source = dataset.table(source_config['tableId']) + destination_uris = config_resource['destinationUris'] + + job = cls( + job_id, source, destination_uris, client=client, job_config=config) job._set_properties(resource) return job diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index e44cde5c9de2..5ba2a19cf271 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -543,6 +543,107 @@ def test_load_table_from_storage_w_autodetect_schema(self): self.assertEqual( sorted(actual_rows, key=by_age), sorted(rows, key=by_age)) + def _load_table_for_extract_table( + self, storage_client, rows, bucket_name, blob_name, table): + from google.cloud._testing import _NamedTemporaryFile + + local_id = unique_resource_id() + gs_url = 'gs://{}/{}'.format(bucket_name, blob_name) + + # In the **very** rare case the bucket name is reserved, this + # fails with a ConnectionError. + bucket = storage_client.create_bucket(bucket_name) + self.to_delete.append(bucket) + blob = bucket.blob(blob_name) + + with _NamedTemporaryFile() as temp: + with open(temp.name, 'w') as csv_write: + writer = csv.writer(csv_write) + writer.writerow(('Full Name', 'Age')) + writer.writerows(rows) + + with open(temp.name, 'rb') as csv_read: + blob.upload_from_file(csv_read, content_type='text/csv') + self.to_delete.insert(0, blob) + + dataset = retry_403(Config.CLIENT.create_dataset)( + Dataset(table.dataset.dataset_id)) + self.to_delete.append(dataset) + table = dataset.table(table.table_id) + self.to_delete.insert(0, table) + job = Config.CLIENT.load_table_from_storage( + 'bq_extract_storage_test_' + local_id, table, gs_url) + job.autodetect = True + job.begin() + # Allow for 90 seconds of "warm up" before rows visible. See + # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability + # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds + retry = RetryInstanceState(_job_done, max_tries=8) + retry(job.reload)() + + def test_extract_table(self): + from google.cloud.storage import Client as StorageClient + + storage_client = StorageClient() + local_id = unique_resource_id() + bucket_name = 'bq_extract_test' + local_id + blob_name = 'person_ages.csv' + dataset_id = _make_dataset_id('load_gcs_then_extract') + table_id = 'test_table' + table = Config.CLIENT.dataset(dataset_id).table(table_id) + rows = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] + self._load_table_for_extract_table( + storage_client, rows, bucket_name, blob_name, table) + bucket = storage_client.bucket(bucket_name) + destination_blob_name = 'person_ages_out.csv' + destination = bucket.blob(destination_blob_name) + destination_uri = 'gs://{}/person_ages_out.csv'.format(bucket_name) + + job = Config.CLIENT.extract_table(table, destination_uri) + job.result() + + self.to_delete.insert(0, destination) + got = destination.download_as_string().decode('utf-8') + self.assertIn('Bharney Rhubble', got) + + def test_extract_table_w_job_config(self): + from google.cloud.storage import Client as StorageClient + + storage_client = StorageClient() + local_id = unique_resource_id() + bucket_name = 'bq_extract_test' + local_id + blob_name = 'person_ages.csv' + dataset_id = _make_dataset_id('load_gcs_then_extract') + table_id = 'test_table' + table = Config.CLIENT.dataset(dataset_id).table(table_id) + rows = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] + self._load_table_for_extract_table( + storage_client, rows, bucket_name, blob_name, table) + bucket = storage_client.bucket(bucket_name) + destination_blob_name = 'person_ages_out.csv' + destination = bucket.blob(destination_blob_name) + destination_uri = 'gs://{}/person_ages_out.csv'.format(bucket_name) + + job_config = bigquery.ExtractJobConfig() + job_config.destination_format = 'NEWLINE_DELIMITED_JSON' + job = Config.CLIENT.extract_table( + table, destination_uri, job_config=job_config) + job.result() + + self.to_delete.insert(0, destination) + got = destination.download_as_string().decode('utf-8') + self.assertIn('"Bharney Rhubble"', got) + def test_job_cancel(self): DATASET_ID = _make_dataset_id('job_cancel') JOB_NAME = 'fetch_' + DATASET_ID diff --git a/bigquery/tests/unit/test__helpers.py b/bigquery/tests/unit/test__helpers.py index b14915d866f8..6cffb6b58d62 100644 --- a/bigquery/tests/unit/test__helpers.py +++ b/bigquery/tests/unit/test__helpers.py @@ -763,6 +763,49 @@ def __init__(self): self.assertIsNone(wrapper._configuration._attr) +class Test_TypedApiResourceProperty(unittest.TestCase): + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery._helpers import _TypedApiResourceProperty + + return _TypedApiResourceProperty + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def test_it(self): + + class Wrapper(object): + attr = self._make_one('attr', 'back', int) + + def __init__(self): + self._properties = {} + + self.assertIsNotNone(Wrapper.attr) + + wrapper = Wrapper() + with self.assertRaises(ValueError): + wrapper.attr = 'BOGUS' + + wrapper.attr = 42 + self.assertEqual(wrapper.attr, 42) + self.assertEqual(wrapper._properties['back'], 42) + + wrapper.attr = None + self.assertIsNone(wrapper.attr) + self.assertIsNone(wrapper._properties['back']) + + wrapper.attr = 23 + self.assertEqual(wrapper.attr, 23) + self.assertEqual(wrapper._properties['back'], 23) + + del wrapper.attr + self.assertIsNone(wrapper.attr) + with self.assertRaises(KeyError): + wrapper._properties['back'] + + class Test_TypedProperty(unittest.TestCase): @staticmethod diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 12282e47d931..d49d8ba4391c 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -15,6 +15,7 @@ import unittest import mock +import six def _make_credentials(): @@ -714,26 +715,105 @@ def test_copy_table(self): self.assertEqual(list(job.sources), [source]) self.assertIs(job.destination, destination) - def test_extract_table_to_storage(self): + def test_extract_table(self): from google.cloud.bigquery.job import ExtractJob PROJECT = 'PROJECT' - JOB = 'job_name' - DATASET = 'dataset_name' + JOB = 'job_id' + DATASET = 'dataset_id' SOURCE = 'source_table' DESTINATION = 'gs://bucket_name/object_name' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'extract': { + 'sourceTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': SOURCE, + }, + 'destinationUris': [DESTINATION], + }, + }, + } creds = _make_credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, _http=http) + conn = client._connection = _Connection(RESOURCE) dataset = client.dataset(DATASET) source = dataset.table(SOURCE) - job = client.extract_table_to_storage(JOB, source, DESTINATION) + + job = client.extract_table(source, DESTINATION, job_id=JOB) + + # Check that extract_table actually starts the job. + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/PROJECT/jobs') + + # Check the job resource. self.assertIsInstance(job, ExtractJob) self.assertIs(job._client, client) self.assertEqual(job.job_id, JOB) self.assertEqual(job.source, source) self.assertEqual(list(job.destination_uris), [DESTINATION]) + def test_extract_table_generated_job_id(self): + from google.cloud.bigquery.job import ExtractJob + from google.cloud.bigquery.job import ExtractJobConfig + from google.cloud.bigquery.job import DestinationFormat + + PROJECT = 'PROJECT' + JOB = 'job_id' + DATASET = 'dataset_id' + SOURCE = 'source_table' + DESTINATION = 'gs://bucket_name/object_name' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'extract': { + 'sourceTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': SOURCE, + }, + 'destinationUris': [DESTINATION], + 'destinationFormat': 'NEWLINE_DELIMITED_JSON', + }, + }, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=PROJECT, credentials=creds, _http=http) + conn = client._connection = _Connection(RESOURCE) + dataset = client.dataset(DATASET) + source = dataset.table(SOURCE) + job_config = ExtractJobConfig() + job_config.destination_format = ( + DestinationFormat.NEWLINE_DELIMITED_JSON) + + job = client.extract_table(source, DESTINATION, job_config=job_config) + + # Check that extract_table actually starts the job. + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/PROJECT/jobs') + self.assertIsInstance( + req['data']['jobReference']['jobId'], six.string_types) + + # Check the job resource. + self.assertIsInstance(job, ExtractJob) + self.assertIs(job._client, client) + self.assertEqual(job.source, source) + self.assertEqual(list(job.destination_uris), [DESTINATION]) + def test_run_async_query_defaults(self): from google.cloud.bigquery.job import QueryJob diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 11f4dec9870c..1da83260f06f 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -17,6 +17,9 @@ from six.moves import http_client import unittest +from google.cloud.bigquery.job import ExtractJobConfig +from google.cloud.bigquery.dataset import DatasetReference + class Test__bool_or_none(unittest.TestCase): @@ -1217,31 +1220,31 @@ def _verifyResourceProperties(self, job, resource): self.assertEqual(job.destination_uris, config['destinationUris']) table_ref = config['sourceTable'] - self.assertEqual(job.source.project, table_ref['projectId']) - self.assertEqual(job.source.dataset_id, table_ref['datasetId']) + self.assertEqual(job.source.dataset.project, table_ref['projectId']) + self.assertEqual(job.source.dataset.dataset_id, table_ref['datasetId']) self.assertEqual(job.source.table_id, table_ref['tableId']) if 'compression' in config: - self.assertEqual(job.compression, - config['compression']) + self.assertEqual( + job.compression, config['compression']) else: self.assertIsNone(job.compression) if 'destinationFormat' in config: - self.assertEqual(job.destination_format, - config['destinationFormat']) + self.assertEqual( + job.destination_format, config['destinationFormat']) else: self.assertIsNone(job.destination_format) if 'fieldDelimiter' in config: - self.assertEqual(job.field_delimiter, - config['fieldDelimiter']) + self.assertEqual( + job.field_delimiter, config['fieldDelimiter']) else: self.assertIsNone(job.field_delimiter) if 'printHeader' in config: - self.assertEqual(job.print_header, - config['printHeader']) + self.assertEqual( + job.print_header, config['printHeader']) else: self.assertIsNone(job.print_header) @@ -1260,7 +1263,7 @@ def test_ctor(self): self._verifyInitialReadonlyProperties(job) - # set/read from resource['configuration']['copy'] + # set/read from resource['configuration']['extract'] self.assertIsNone(job.compression) self.assertIsNone(job.destination_format) self.assertIsNone(job.field_delimiter) @@ -1350,7 +1353,8 @@ def test_begin_w_bound_client(self): del RESOURCE['user_email'] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) - source = _Table(self.SOURCE_TABLE) + source_dataset = DatasetReference(self.PROJECT, self.DS_ID) + source = source_dataset.table(self.SOURCE_TABLE) job = self._make_one(self.JOB_NAME, source, [self.DESTINATION_URI], client) @@ -1399,14 +1403,15 @@ def test_begin_w_alternate_client(self): client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection(RESOURCE) client2 = _Client(project=self.PROJECT, connection=conn2) - source = _Table(self.SOURCE_TABLE) + source_dataset = DatasetReference(self.PROJECT, self.DS_ID) + source = source_dataset.table(self.SOURCE_TABLE) + job_config = ExtractJobConfig() + job_config.compression = 'GZIP' + job_config.destination_format = 'NEWLINE_DELIMITED_JSON' + job_config.field_delimiter = '|' + job_config.print_header = False job = self._make_one(self.JOB_NAME, source, [self.DESTINATION_URI], - client1) - - job.compression = 'GZIP' - job.destination_format = 'NEWLINE_DELIMITED_JSON' - job.field_delimiter = '|' - job.print_header = False + client1, job_config) job.begin(client=client2) @@ -1467,7 +1472,8 @@ def test_reload_w_bound_client(self): RESOURCE = self._makeResource() conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) - source = _Table(self.SOURCE_TABLE) + source_dataset = DatasetReference(self.PROJECT, self.DS_ID) + source = source_dataset.table(self.SOURCE_TABLE) job = self._make_one(self.JOB_NAME, source, [self.DESTINATION_URI], client) @@ -1486,7 +1492,8 @@ def test_reload_w_alternate_client(self): client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection(RESOURCE) client2 = _Client(project=self.PROJECT, connection=conn2) - source = _Table(self.SOURCE_TABLE) + source_dataset = DatasetReference(self.PROJECT, self.DS_ID) + source = source_dataset.table(self.SOURCE_TABLE) job = self._make_one(self.JOB_NAME, source, [self.DESTINATION_URI], client1)