From 01fc4f47c50894c2ca77b7d04c6fbad366baaeb5 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 4 Oct 2017 14:32:42 -0400 Subject: [PATCH] bigquery: modify LoadJob (#4103) This PR handles loading from GCS. Loading from a local file will be done separately. --- bigquery/google/cloud/bigquery/__init__.py | 2 + bigquery/google/cloud/bigquery/client.py | 31 +- bigquery/google/cloud/bigquery/job.py | 407 ++++++++++----------- bigquery/tests/system.py | 34 +- bigquery/tests/unit/test_client.py | 41 ++- bigquery/tests/unit/test_job.py | 231 ++++++------ 6 files changed, 376 insertions(+), 370 deletions(-) diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 7bbcc7782ee2..3a7cc2be7a69 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -35,6 +35,7 @@ from google.cloud.bigquery.job import CopyJobConfig from google.cloud.bigquery.job import ExtractJobConfig from google.cloud.bigquery.job import QueryJobConfig +from google.cloud.bigquery.job import LoadJobConfig from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table @@ -47,6 +48,7 @@ 'CopyJobConfig', 'ExtractJobConfig', 'QueryJobConfig', + 'LoadJobConfig', 'ScalarQueryParameter', 'SchemaField', 'StructQueryParameter', diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 7ceed4fc1e41..da69decd03c8 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -19,6 +19,8 @@ import collections import uuid +import six + from google.api.core import page_iterator from google.cloud.client import ClientWithProject from google.cloud.bigquery._http import Connection @@ -490,26 +492,37 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None, max_results=max_results, extra_params=extra_params) - def load_table_from_storage(self, job_id, destination, *source_uris): - """Construct a job for loading data into a table from CloudStorage. + def load_table_from_storage(self, source_uris, destination, + job_id=None, job_config=None): + """Starts a job for loading data into a table from CloudStorage. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load - :type job_id: str - :param job_id: Name of the job. + :type source_uris: One of: + str + sequence of string + :param source_uris: URIs of data files to be loaded; in format + ``gs:///``. - :type destination: :class:`google.cloud.bigquery.table.Table` + :type destination: :class:`google.cloud.bigquery.table.TableReference` :param destination: Table into which data is to be loaded. - :type source_uris: sequence of string - :param source_uris: URIs of data files to be loaded; in format - ``gs:///``. + :type job_id: str + :param job_id: Name of the job. + + :type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig` + :param job_config: (Optional) Extra configuration options for the job. :rtype: :class:`google.cloud.bigquery.job.LoadJob` :returns: a new ``LoadJob`` instance """ - return LoadJob(job_id, destination, source_uris, client=self) + job_id = _make_job_id(job_id) + if isinstance(source_uris, six.string_types): + source_uris = [source_uris] + job = LoadJob(job_id, source_uris, destination, self, job_config) + job.begin() + return job def copy_table(self, sources, destination, job_id=None, job_config=None): """Start a job for copying one or more tables into another table. diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 812dde4b32a3..fd427c647a55 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -24,10 +24,8 @@ from google.cloud import exceptions from google.cloud.exceptions import NotFound from google.cloud._helpers import _datetime_from_microseconds -from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.schema import SchemaField -from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import _build_schema_resource from google.cloud.bigquery.table import _parse_schema_resource @@ -106,20 +104,6 @@ def _error_result_to_exception(error_result): status_code, error_result.get('message', ''), errors=[error_result]) -class AutoDetectSchema(_TypedProperty): - """Typed Property for ``autodetect`` properties. - - :raises ValueError: on ``set`` operation if ``instance.schema`` - is already defined. - """ - def __set__(self, instance, value): - self._validate(value) - if instance.schema: - raise ValueError('A schema should not be already defined ' - 'when using schema auto-detection') - setattr(instance._configuration, self._backing_name, value) - - class Compression(_EnumApiResourceProperty): """Pseudo-enum for ``compression`` properties.""" GZIP = 'GZIP' @@ -139,7 +123,7 @@ class DestinationFormat(_EnumApiResourceProperty): AVRO = 'AVRO' -class Encoding(_EnumProperty): +class Encoding(_EnumApiResourceProperty): """Pseudo-enum for ``encoding`` properties.""" UTF_8 = 'UTF-8' ISO_8559_1 = 'ISO-8559-1' @@ -151,7 +135,7 @@ class QueryPriority(_EnumProperty): BATCH = 'BATCH' -class SourceFormat(_EnumProperty): +class SourceFormat(_EnumApiResourceProperty): """Pseudo-enum for ``source_format`` properties.""" CSV = 'CSV' DATASTORE_BACKUP = 'DATASTORE_BACKUP' @@ -166,6 +150,20 @@ class WriteDisposition(_EnumApiResourceProperty): WRITE_EMPTY = 'WRITE_EMPTY' +class AutoDetectSchema(_TypedApiResourceProperty): + """Property for ``autodetect`` properties. + + :raises ValueError: on ``set`` operation if ``instance.schema`` + is already defined. + """ + def __set__(self, instance, value): + self._validate(value) + if instance.schema: + raise ValueError('A schema should not be already defined ' + 'when using schema auto-detection') + instance._properties[self.resource_name] = value + + class _AsyncJob(google.api.core.future.polling.PollingFuture): """Base class for asynchronous jobs. @@ -542,35 +540,151 @@ def cancelled(self): and self.error_result.get('reason') == _STOPPED_REASON) -class _LoadConfiguration(object): - """User-settable configuration options for load jobs. +class LoadJobConfig(object): + """Configuration options for load jobs. - Values which are ``None`` -> server defaults. + All properties in this class are optional. Values which are ``None`` -> + server defaults. """ - _allow_jagged_rows = None - _allow_quoted_newlines = None - _autodetect = None - _create_disposition = None - _encoding = None - _field_delimiter = None - _ignore_unknown_values = None - _max_bad_records = None - _null_marker = None - _quote_character = None - _skip_leading_rows = None - _source_format = None - _write_disposition = None + + def __init__(self): + self._properties = {} + self._schema = () + + allow_jagged_rows = _TypedApiResourceProperty( + 'allow_jagged_rows', 'allowJaggedRows', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowJaggedRows + """ + + allow_quoted_newlines = _TypedApiResourceProperty( + 'allow_quoted_newlines', 'allowQuotedNewlines', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowQuotedNewlines + """ + + autodetect = AutoDetectSchema('autodetect', 'autodetect', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect + """ + + create_disposition = CreateDisposition('create_disposition', + 'createDisposition') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition + """ + + encoding = Encoding('encoding', 'encoding') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.encoding + """ + + field_delimiter = _TypedApiResourceProperty( + 'field_delimiter', 'fieldDelimiter', six.string_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.fieldDelimiter + """ + + ignore_unknown_values = _TypedApiResourceProperty( + 'ignore_unknown_values', 'ignoreUnknownValues', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.ignoreUnknownValues + """ + + max_bad_records = _TypedApiResourceProperty( + 'max_bad_records', 'maxBadRecords', six.integer_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.maxBadRecords + """ + + null_marker = _TypedApiResourceProperty( + 'null_marker', 'nullMarker', six.string_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.nullMarker + """ + + quote_character = _TypedApiResourceProperty( + 'quote_character', 'quote', six.string_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.quote + """ + + skip_leading_rows = _TypedApiResourceProperty( + 'skip_leading_rows', 'skipLeadingRows', six.integer_types) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.skipLeadingRows + """ + + source_format = SourceFormat('source_format', 'sourceFormat') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceFormat + """ + + write_disposition = WriteDisposition('write_disposition', + 'writeDisposition') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.writeDisposition + """ + + @property + def schema(self): + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema + """ + return list(self._schema) + + @schema.setter + def schema(self, value): + if not all(isinstance(field, SchemaField) for field in value): + raise ValueError('Schema items must be fields') + if self.autodetect: + raise ValueError( + 'Schema can not be set if `autodetect` property is True') + self._schema = tuple(value) + + def to_api_repr(self): + """Build an API representation of the load job config. + + :rtype: dict + :returns: A dictionary in the format used by the BigQuery API. + """ + config = copy.deepcopy(self._properties) + if len(self.schema) > 0: + config['schema'] = {'fields': _build_schema_resource(self.schema)} + # skipLeadingRows is a string because it's defined as an int64, which + # can't be represented as a JSON number. + slr = config.get('skipLeadingRows') + if slr is not None: + config['skipLeadingRows'] = str(slr) + return config + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct a job configuration given its API representation + + :type resource: dict + :param resource: + An extract job configuration in the same representation as is + returned from the API. + + :rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig` + :returns: Configuration parsed from ``resource``. + """ + schema = resource.pop('schema', {'fields': ()}) + slr = resource.pop('skipLeadingRows', None) + config = cls() + config._properties = copy.deepcopy(resource) + config.schema = _parse_schema_resource(schema) + config.skip_leading_rows = _int_or_none(slr) class LoadJob(_AsyncJob): - """Asynchronous job for loading data into a table from remote URI. + """Asynchronous job for loading data into a table. - :type job_id: str - :param job_id: - The job's ID, belonging to the project associated with the client. + Can load from Google Cloud Storage URIs or from a file. - :type destination: :class:`google.cloud.bigquery.table.Table` - :param destination: Table into which data is to be loaded. + :type job_id: str + :param job_id: the job's ID :type source_uris: sequence of string :param source_uris: @@ -578,56 +692,34 @@ class LoadJob(_AsyncJob): https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceUris for supported URI formats. + :type destination: :class:`google.cloud.bigquery.table.TableReference` + :param destination: reference to table into which data is to be loaded. + :type client: :class:`google.cloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). - - :type schema: list of :class:`google.cloud.bigquery.table.SchemaField` - :param schema: The job's schema """ - _schema = None _JOB_TYPE = 'load' - def __init__(self, name, destination, source_uris, client, schema=()): - super(LoadJob, self).__init__(name, client) - self.destination = destination - self.source_uris = source_uris - self._configuration = _LoadConfiguration() - # Let the @property do validation. This must occur after all other - # attributes have been set. - self.schema = schema + def __init__(self, job_id, source_uris, destination, client, + job_config=None): + super(LoadJob, self).__init__(job_id, client) - @property - def schema(self): - """Table's schema. - - :rtype: list of :class:`SchemaField` - :returns: fields describing the schema - """ - return list(self._schema) + if job_config is None: + job_config = LoadJobConfig() - @schema.setter - def schema(self, value): - """Update table's schema + self.source_uris = source_uris + self.destination = destination + self._configuration = job_config - :type value: list of :class:`SchemaField` - :param value: fields describing the schema + @property + def configuration(self): + """Configuration for this job. - :raises TypeError: If ``value`is not a sequence. - :raises ValueError: If any item in the sequence is not - a ``SchemaField``. + :rtype: :class:`~google.cloud.bigquery.job.LoadJobConfig` """ - if not value: - self._schema = () - else: - if not all(isinstance(field, SchemaField) for field in value): - raise ValueError('Schema items must be fields') - if self.autodetect: - raise ValueError( - 'Schema can not be set if `autodetect` property is True') - - self._schema = tuple(value) + return self._configuration @property def input_file_bytes(self): @@ -673,155 +765,25 @@ def output_rows(self): if statistics is not None: return int(statistics['load']['outputRows']) - allow_jagged_rows = _TypedProperty('allow_jagged_rows', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowJaggedRows - """ - - allow_quoted_newlines = _TypedProperty('allow_quoted_newlines', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowQuotedNewlines - """ - - autodetect = AutoDetectSchema('autodetect', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect - """ - - create_disposition = CreateDisposition('create_disposition', - 'createDisposition') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition - """ - - encoding = Encoding('encoding') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.encoding - """ - - field_delimiter = _TypedProperty('field_delimiter', six.string_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.fieldDelimiter - """ - - ignore_unknown_values = _TypedProperty('ignore_unknown_values', bool) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.ignoreUnknownValues - """ - - max_bad_records = _TypedProperty('max_bad_records', six.integer_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.maxBadRecords - """ - - null_marker = _TypedProperty('null_marker', six.string_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.nullMarker - """ - - quote_character = _TypedProperty('quote_character', six.string_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.quote - """ - - skip_leading_rows = _TypedProperty('skip_leading_rows', six.integer_types) - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.skipLeadingRows - """ - - source_format = SourceFormat('source_format') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceFormat - """ - - write_disposition = WriteDisposition('write_disposition', - 'writeDisposition') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.writeDisposition - """ - - def _populate_config_resource(self, configuration): - """Helper for _build_resource: copy config properties to resource""" - if self.allow_jagged_rows is not None: - configuration['allowJaggedRows'] = self.allow_jagged_rows - if self.allow_quoted_newlines is not None: - configuration['allowQuotedNewlines'] = self.allow_quoted_newlines - if self.autodetect is not None: - configuration['autodetect'] = self.autodetect - if self.create_disposition is not None: - configuration['createDisposition'] = self.create_disposition - if self.encoding is not None: - configuration['encoding'] = self.encoding - if self.field_delimiter is not None: - configuration['fieldDelimiter'] = self.field_delimiter - if self.ignore_unknown_values is not None: - configuration['ignoreUnknownValues'] = self.ignore_unknown_values - if self.max_bad_records is not None: - configuration['maxBadRecords'] = self.max_bad_records - if self.null_marker is not None: - configuration['nullMarker'] = self.null_marker - if self.quote_character is not None: - configuration['quote'] = self.quote_character - if self.skip_leading_rows is not None: - configuration['skipLeadingRows'] = str(self.skip_leading_rows) - if self.source_format is not None: - configuration['sourceFormat'] = self.source_format - if self.write_disposition is not None: - configuration['writeDisposition'] = self.write_disposition - def _build_resource(self): """Generate a resource for :meth:`begin`.""" - resource = { + configuration = self._configuration.to_api_repr() + configuration['sourceUris'] = self.source_uris + configuration['destinationTable'] = self.destination.to_api_repr() + + return { 'jobReference': { 'projectId': self.project, 'jobId': self.job_id, }, 'configuration': { - self._JOB_TYPE: { - 'sourceUris': self.source_uris, - 'destinationTable': { - 'projectId': self.destination.project, - 'datasetId': self.destination.dataset_id, - 'tableId': self.destination.table_id, - }, - }, + self._JOB_TYPE: configuration, }, } - configuration = resource['configuration'][self._JOB_TYPE] - self._populate_config_resource(configuration) - - if len(self.schema) > 0: - configuration['schema'] = { - 'fields': _build_schema_resource(self.schema)} - - return resource - - def _scrub_local_properties(self, cleaned): - """Helper: handle subclass properties in cleaned.""" - schema = cleaned.pop('schema', {'fields': ()}) - self.schema = _parse_schema_resource(schema) def _copy_configuration_properties(self, configuration): """Helper: assign subclass configuration properties in cleaned.""" - self.allow_jagged_rows = _bool_or_none( - configuration.get('allowJaggedRows')) - self.allow_quoted_newlines = _bool_or_none( - configuration.get('allowQuotedNewlines')) - self.autodetect = _bool_or_none( - configuration.get('autodetect')) - self.create_disposition = configuration.get('createDisposition') - self.encoding = configuration.get('encoding') - self.field_delimiter = configuration.get('fieldDelimiter') - self.ignore_unknown_values = _bool_or_none( - configuration.get('ignoreUnknownValues')) - self.max_bad_records = _int_or_none( - configuration.get('maxBadRecords')) - self.null_marker = configuration.get('nullMarker') - self.quote_character = configuration.get('quote') - self.skip_leading_rows = _int_or_none( - configuration.get('skipLeadingRows')) - self.source_format = configuration.get('sourceFormat') - self.write_disposition = configuration.get('writeDisposition') + self._configuration._properties = copy.deepcopy(configuration) @classmethod def from_api_repr(cls, resource, client): @@ -842,15 +804,16 @@ def from_api_repr(cls, resource, client): :rtype: :class:`google.cloud.bigquery.job.LoadJob` :returns: Job parsed from ``resource``. """ - job_id, config = cls._get_resource_config(resource) - dest_config = config['destinationTable'] + job_id, config_resource = cls._get_resource_config(resource) + config = LoadJobConfig.from_api_repr(config_resource) + dest_config = config_resource['destinationTable'] ds_ref = DatasetReference(dest_config['projectId'], dest_config['datasetId'],) - dataset = Dataset(ds_ref) - table_ref = TableReference(dataset, dest_config['tableId']) - destination = Table(table_ref, client=client) - source_urls = config.get('sourceUris', ()) - job = cls(job_id, destination, source_urls, client=client) + destination = TableReference(ds_ref, dest_config['tableId']) + # TODO(jba): sourceUris should not be absent if there are no LoadJobs + # for file uploads. + source_uris = config_resource.get('sourceUris') + job = cls(job_id, source_uris, destination, client, config) job._set_properties(resource) return job diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 2fd43f7951c4..ada6d92b5050 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -463,14 +463,13 @@ def test_load_table_from_storage_then_dump_table(self): table = retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) + config = bigquery.LoadJobConfig() + config.create_disposition = 'CREATE_NEVER' + config.skip_leading_rows = 1 + config.source_format = 'CSV' + config.write_disposition = 'WRITE_EMPTY' job = Config.CLIENT.load_table_from_storage( - 'bq_load_storage_test_' + local_id, table, GS_URL) - job.create_disposition = 'CREATE_NEVER' - job.skip_leading_rows = 1 - job.source_format = 'CSV' - job.write_disposition = 'WRITE_EMPTY' - - job.begin() + GS_URL, dataset.table(TABLE_NAME), job_config=config) # Allow for 90 seconds of "warm up" before rows visible. See # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability @@ -523,11 +522,10 @@ def test_load_table_from_storage_w_autodetect_schema(self): dataset = self.temp_dataset(_make_dataset_id('load_gcs_then_dump')) table_ref = dataset.table(table_name) - job = Config.CLIENT.load_table_from_storage( - 'bq_load_storage_test_' + local_id, table_ref, gs_url) - job.autodetect = True - - job.begin() + config = bigquery.LoadJobConfig() + config.autodetect = True + job = Config.CLIENT.load_table_from_storage(gs_url, table_ref, + job_config=config) # Allow for 90 seconds of "warm up" before rows visible. See # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability @@ -551,7 +549,6 @@ def _load_table_for_extract_table( self, storage_client, rows, bucket_name, blob_name, table): from google.cloud._testing import _NamedTemporaryFile - local_id = unique_resource_id() gs_url = 'gs://{}/{}'.format(bucket_name, blob_name) # In the **very** rare case the bucket name is reserved, this @@ -572,10 +569,11 @@ def _load_table_for_extract_table( dataset = self.temp_dataset(table.dataset_id) table_ref = dataset.table(table.table_id) - job = Config.CLIENT.load_table_from_storage( - 'bq_extract_storage_test_' + local_id, table_ref, gs_url) - job.autodetect = True - job.begin() + config = bigquery.LoadJobConfig() + config.autodetect = True + job = Config.CLIENT.load_table_from_storage(gs_url, table_ref, + job_config=config) + # TODO(jba): do we need this retry now that we have job.result()? # Allow for 90 seconds of "warm up" before rows visible. See # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds @@ -608,7 +606,7 @@ def test_extract_table(self): destination_uri = 'gs://{}/person_ages_out.csv'.format(bucket_name) job = Config.CLIENT.extract_table(table_ref, destination_uri) - job.result() + job.result(timeout=100) self.to_delete.insert(0, destination) got = destination.download_as_string().decode('utf-8') diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index d1a6d1218ae8..3f667039f497 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -1122,12 +1122,47 @@ def test_load_table_from_storage(self): DATASET = 'dataset_name' DESTINATION = 'destination_table' SOURCE_URI = 'http://example.com/source.csv' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'load': { + 'sourceUris': [SOURCE_URI], + 'destinationTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': DESTINATION, + }, + }, + }, + } creds = _make_credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, _http=http) - dataset = client.dataset(DATASET) - destination = dataset.table(DESTINATION) - job = client.load_table_from_storage(JOB, destination, SOURCE_URI) + conn = client._connection = _Connection(RESOURCE) + destination = client.dataset(DATASET).table(DESTINATION) + + job = client.load_table_from_storage(SOURCE_URI, destination, + job_id=JOB) + + # Check that load_table_from_storage actually starts the job. + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/%s/jobs' % PROJECT) + + self.assertIsInstance(job, LoadJob) + self.assertIs(job._client, client) + self.assertEqual(job.job_id, JOB) + self.assertEqual(list(job.source_uris), [SOURCE_URI]) + self.assertIs(job.destination, destination) + + conn = client._connection = _Connection(RESOURCE) + + job = client.load_table_from_storage([SOURCE_URI], destination, + job_id=JOB) self.assertIsInstance(job, LoadJob) self.assertIs(job._client, client) self.assertEqual(job.job_id, JOB) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index d0a654c0c15d..e6b903bfebaf 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -18,6 +18,7 @@ import unittest from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig +from google.cloud.bigquery.job import LoadJobConfig from google.cloud.bigquery.dataset import DatasetReference @@ -84,12 +85,14 @@ def test_missing_reason(self): class _Base(object): from google.cloud.bigquery.dataset import DatasetReference + from google.cloud.bigquery.table import TableReference PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' DS_ID = 'datset_id' DS_REF = DatasetReference(PROJECT, DS_ID) TABLE_ID = 'table_id' + TABLE_REF = TableReference(DS_REF, TABLE_ID) JOB_NAME = 'job_name' def _make_one(self, *args, **kw): @@ -231,50 +234,53 @@ def _makeResource(self, started=False, ended=False): return resource def _verifyBooleanConfigProperties(self, job, config): + jconfig = job.configuration if 'allowJaggedRows' in config: - self.assertEqual(job.allow_jagged_rows, + self.assertEqual(jconfig.allow_jagged_rows, config['allowJaggedRows']) else: - self.assertIsNone(job.allow_jagged_rows) + self.assertIsNone(jconfig.allow_jagged_rows) if 'allowQuotedNewlines' in config: - self.assertEqual(job.allow_quoted_newlines, + self.assertEqual(jconfig.allow_quoted_newlines, config['allowQuotedNewlines']) else: - self.assertIsNone(job.allow_quoted_newlines) + self.assertIsNone(jconfig.allow_quoted_newlines) if 'autodetect' in config: self.assertEqual( - job.autodetect, config['autodetect']) + jconfig.autodetect, config['autodetect']) else: - self.assertIsNone(job.autodetect) + self.assertIsNone(jconfig.autodetect) if 'ignoreUnknownValues' in config: - self.assertEqual(job.ignore_unknown_values, + self.assertEqual(jconfig.ignore_unknown_values, config['ignoreUnknownValues']) else: - self.assertIsNone(job.ignore_unknown_values) + self.assertIsNone(jconfig.ignore_unknown_values) def _verifyEnumConfigProperties(self, job, config): + jconfig = job.configuration if 'createDisposition' in config: - self.assertEqual(job.create_disposition, + self.assertEqual(jconfig.create_disposition, config['createDisposition']) else: - self.assertIsNone(job.create_disposition) + self.assertIsNone(jconfig.create_disposition) if 'encoding' in config: - self.assertEqual(job.encoding, + self.assertEqual(jconfig.encoding, config['encoding']) else: - self.assertIsNone(job.encoding) + self.assertIsNone(jconfig.encoding) if 'sourceFormat' in config: - self.assertEqual(job.source_format, + self.assertEqual(jconfig.source_format, config['sourceFormat']) else: - self.assertIsNone(job.source_format) + self.assertIsNone(jconfig.source_format) if 'writeDisposition' in config: - self.assertEqual(job.write_disposition, + self.assertEqual(jconfig.write_disposition, config['writeDisposition']) else: - self.assertIsNone(job.write_disposition) + self.assertIsNone(jconfig.write_disposition) def _verifyResourceProperties(self, job, resource): + jconfig = job.configuration self._verifyReadonlyResourceProperties(job, resource) config = resource.get('configuration', {}).get('load') @@ -290,43 +296,43 @@ def _verifyResourceProperties(self, job, resource): self.assertEqual(job.destination.table_id, table_ref['tableId']) if 'fieldDelimiter' in config: - self.assertEqual(job.field_delimiter, + self.assertEqual(jconfig.field_delimiter, config['fieldDelimiter']) else: - self.assertIsNone(job.field_delimiter) + self.assertIsNone(jconfig.field_delimiter) if 'maxBadRecords' in config: - self.assertEqual(job.max_bad_records, + self.assertEqual(jconfig.max_bad_records, config['maxBadRecords']) else: - self.assertIsNone(job.max_bad_records) + self.assertIsNone(jconfig.max_bad_records) if 'nullMarker' in config: - self.assertEqual(job.null_marker, + self.assertEqual(jconfig.null_marker, config['nullMarker']) else: - self.assertIsNone(job.null_marker) + self.assertIsNone(jconfig.null_marker) if 'quote' in config: - self.assertEqual(job.quote_character, + self.assertEqual(jconfig.quote_character, config['quote']) else: - self.assertIsNone(job.quote_character) + self.assertIsNone(jconfig.quote_character) if 'skipLeadingRows' in config: - self.assertEqual(str(job.skip_leading_rows), + self.assertEqual(str(jconfig.skip_leading_rows), config['skipLeadingRows']) else: - self.assertIsNone(job.skip_leading_rows) + self.assertIsNone(jconfig.skip_leading_rows) def test_ctor(self): client = _Client(self.PROJECT) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) - self.assertIs(job.destination, table) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client) + self.assertIs(job.destination, self.TABLE_REF) self.assertEqual(list(job.source_uris), [self.SOURCE1]) self.assertIs(job._client, client) self.assertEqual(job.job_type, self.JOB_TYPE) self.assertEqual( job.path, '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) - self.assertEqual(job.schema, []) + self.assertEqual(job.configuration.schema, []) self._verifyInitialReadonlyProperties(job) @@ -337,30 +343,32 @@ def test_ctor(self): self.assertIsNone(job.output_rows) # set/read from resource['configuration']['load'] - self.assertIsNone(job.allow_jagged_rows) - self.assertIsNone(job.allow_quoted_newlines) - self.assertIsNone(job.autodetect) - self.assertIsNone(job.create_disposition) - self.assertIsNone(job.encoding) - self.assertIsNone(job.field_delimiter) - self.assertIsNone(job.ignore_unknown_values) - self.assertIsNone(job.max_bad_records) - self.assertIsNone(job.null_marker) - self.assertIsNone(job.quote_character) - self.assertIsNone(job.skip_leading_rows) - self.assertIsNone(job.source_format) - self.assertIsNone(job.write_disposition) - - def test_ctor_w_schema(self): + jconfig = job.configuration + self.assertIsNone(jconfig.allow_jagged_rows) + self.assertIsNone(jconfig.allow_quoted_newlines) + self.assertIsNone(jconfig.autodetect) + self.assertIsNone(jconfig.create_disposition) + self.assertIsNone(jconfig.encoding) + self.assertIsNone(jconfig.field_delimiter) + self.assertIsNone(jconfig.ignore_unknown_values) + self.assertIsNone(jconfig.max_bad_records) + self.assertIsNone(jconfig.null_marker) + self.assertIsNone(jconfig.quote_character) + self.assertIsNone(jconfig.skip_leading_rows) + self.assertIsNone(jconfig.source_format) + self.assertIsNone(jconfig.write_disposition) + + def test_ctor_w_config(self): from google.cloud.bigquery.schema import SchemaField client = _Client(self.PROJECT) - table = _Table() full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') age = SchemaField('age', 'INTEGER', mode='REQUIRED') - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client, - schema=[full_name, age]) - self.assertEqual(job.schema, [full_name, age]) + config = LoadJobConfig() + config.schema = [full_name, age] + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client, config) + self.assertEqual(job.configuration.schema, [full_name, age]) def test_done(self): client = _Client(self.PROJECT) @@ -377,15 +385,15 @@ def test_result(self): self.assertIs(result, job) - def test_result_invokes_begins(self): + def test_result_invokes_begin(self): begun_resource = self._makeResource() done_resource = copy.deepcopy(begun_resource) done_resource['status'] = {'state': 'DONE'} connection = _Connection(begun_resource, done_resource) client = _Client(self.PROJECT, connection=connection) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client) job.result() self.assertEqual(len(connection._requested), 2) @@ -394,67 +402,52 @@ def test_result_invokes_begins(self): self.assertEqual(reload_request['method'], 'GET') def test_schema_setter_non_list(self): - client = _Client(self.PROJECT) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + config = LoadJobConfig() with self.assertRaises(TypeError): - job.schema = object() + config.schema = object() def test_schema_setter_invalid_field(self): from google.cloud.bigquery.schema import SchemaField - client = _Client(self.PROJECT) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + config = LoadJobConfig() full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') with self.assertRaises(ValueError): - job.schema = [full_name, object()] + config.schema = [full_name, object()] def test_schema_setter(self): from google.cloud.bigquery.schema import SchemaField - client = _Client(self.PROJECT) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + config = LoadJobConfig() full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') age = SchemaField('age', 'INTEGER', mode='REQUIRED') - job.schema = [full_name, age] - self.assertEqual(job.schema, [full_name, age]) + config.schema = [full_name, age] + self.assertEqual(config.schema, [full_name, age]) def test_schema_setter_w_autodetect(self): from google.cloud.bigquery.schema import SchemaField - client = _Client(self.PROJECT) - table = _Table() - full_name = SchemaField('full_name', 'STRING') - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) - job.autodetect = False - job.schema = [full_name] - self.assertEqual(job.schema, [full_name]) - - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) - job.autodetect = True + config = LoadJobConfig() + schema = [SchemaField('full_name', 'STRING')] + config.autodetect = False + config.schema = schema + self.assertEqual(config.schema, schema) + + config.schema = [] + config.autodetect = True with self.assertRaises(ValueError): - job.schema = [full_name] + config.schema = schema def test_autodetect_setter_w_schema(self): from google.cloud.bigquery.schema import SchemaField - client = _Client(self.PROJECT) - table = _Table() - full_name = SchemaField('full_name', 'STRING') - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) - - job.autodetect = True - job.schema = [] - self.assertEqual(job.schema, []) + config = LoadJobConfig() - job.autodetect = False - job.schema = [full_name] - self.assertEqual(job.autodetect, False) + config.autodetect = False + config.schema = [SchemaField('full_name', 'STRING')] + self.assertEqual(config.autodetect, False) with self.assertRaises(ValueError): - job.autodetect = True + config.autodetect = True def test_props_set_by_server(self): import datetime @@ -475,7 +468,7 @@ def test_props_set_by_server(self): client = _Client(self.PROJECT) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client) job._properties['etag'] = 'ETAG' job._properties['id'] = JOB_ID job._properties['selfLink'] = URL @@ -578,8 +571,8 @@ def test_from_api_repr_w_properties(self): def test_begin_w_already_running(self): conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client) job._properties['status'] = {'state': 'RUNNING'} with self.assertRaises(ValueError): @@ -595,8 +588,8 @@ def test_begin_w_bound_client(self): del RESOURCE['user_email'] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client) job.begin() @@ -634,9 +627,10 @@ def test_begin_w_autodetect(self): del resource['user_email'] conn = _Connection(resource) client = _Client(project=self.PROJECT, connection=conn) - table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) - job.autodetect = True + config = LoadJobConfig() + config.autodetect = True + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client, config) job.begin() sent = { @@ -698,24 +692,24 @@ def test_begin_w_alternate_client(self): client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection(RESOURCE) client2 = _Client(project=self.PROJECT, connection=conn2) - table = _Table() full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') age = SchemaField('age', 'INTEGER', mode='REQUIRED') - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client1, - schema=[full_name, age]) - - job.allow_jagged_rows = True - job.allow_quoted_newlines = True - job.create_disposition = 'CREATE_NEVER' - job.encoding = 'ISO-8559-1' - job.field_delimiter = '|' - job.ignore_unknown_values = True - job.max_bad_records = 100 - job.null_marker = r'\N' - job.quote_character = "'" - job.skip_leading_rows = 1 - job.source_format = 'CSV' - job.write_disposition = 'WRITE_TRUNCATE' + config = LoadJobConfig() + config.schema = [full_name, age] + job = self._make_one(self.JOB_NAME, [self.SOURCE1], self.TABLE_REF, + client1, config) + config.allow_jagged_rows = True + config.allow_quoted_newlines = True + config.create_disposition = 'CREATE_NEVER' + config.encoding = 'ISO-8559-1' + config.field_delimiter = '|' + config.ignore_unknown_values = True + config.max_bad_records = 100 + config.null_marker = r'\N' + config.quote_character = "'" + config.skip_leading_rows = 1 + config.source_format = 'CSV' + config.write_disposition = 'WRITE_TRUNCATE' job.begin(client=client2) @@ -733,6 +727,7 @@ def test_begin_w_alternate_client(self): 'load': LOAD_CONFIGURATION, }, } + self.maxDiff = None self.assertEqual(req['data'], SENT) self._verifyResourceProperties(job, RESOURCE) @@ -741,7 +736,7 @@ def test_exists_miss_w_bound_client(self): conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client) self.assertFalse(job.exists()) @@ -758,7 +753,7 @@ def test_exists_hit_w_alternate_client(self): conn2 = _Connection({}) client2 = _Client(project=self.PROJECT, connection=conn2) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client1) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client1) self.assertTrue(job.exists(client=client2)) @@ -775,7 +770,7 @@ def test_reload_w_bound_client(self): conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client) job.reload() @@ -793,7 +788,7 @@ def test_reload_w_alternate_client(self): conn2 = _Connection(RESOURCE) client2 = _Client(project=self.PROJECT, connection=conn2) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client1) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client1) job.reload(client=client2) @@ -811,7 +806,7 @@ def test_cancel_w_bound_client(self): conn = _Connection(RESPONSE) client = _Client(project=self.PROJECT, connection=conn) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client) job.cancel() @@ -830,7 +825,7 @@ def test_cancel_w_alternate_client(self): conn2 = _Connection(RESPONSE) client2 = _Client(project=self.PROJECT, connection=conn2) table = _Table() - job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client1) + job = self._make_one(self.JOB_NAME, [self.SOURCE1], table, client1) job.cancel(client=client2)