Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'Client.get_job' API wrapper. #3804

Merged
merged 6 commits into from
Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigquery/.coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ exclude_lines =
pragma: NO COVER
# Ignore debug-only repr
def __repr__
# Ignore abstract methods
raise NotImplementedError
38 changes: 38 additions & 0 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ def _validate(self, value):

:raises: ValueError on a type mismatch.
"""
if value is None:
return
if not isinstance(value, self.property_type):
raise ValueError('Required type: %s' % (self.property_type,))

Expand Down Expand Up @@ -396,6 +398,14 @@ def __init__(self, name, type_, value):
self.type_ = type_
self.value = value

def __eq__(self, other):
if not isinstance(other, ScalarQueryParameter):
return NotImplemented
return(
self.name == other.name and
self.type_ == other.type_ and
self.value == other.value)

@classmethod
def positional(cls, type_, value):
"""Factory for positional paramater.
Expand Down Expand Up @@ -473,6 +483,14 @@ def __init__(self, name, array_type, values):
self.array_type = array_type
self.values = values

def __eq__(self, other):
if not isinstance(other, ArrayQueryParameter):
return NotImplemented
return(
self.name == other.name and
self.array_type == other.array_type and
self.values == other.values)

@classmethod
def positional(cls, array_type, values):
"""Factory for positional parameters.
Expand Down Expand Up @@ -566,6 +584,14 @@ def __init__(self, name, *sub_params):
types[sub.name] = sub.type_
values[sub.name] = sub.value

def __eq__(self, other):
if not isinstance(other, StructQueryParameter):
return NotImplemented
return(
self.name == other.name and
self.struct_types == other.struct_types and
self.struct_values == other.struct_values)

@classmethod
def positional(cls, *sub_params):
"""Factory for positional parameters.
Expand Down Expand Up @@ -636,6 +662,18 @@ def to_api_repr(self):
return resource


def _query_param_from_api_repr(resource):
"""Helper: construct concrete query parameter from JSON resource."""
qp_type = resource['parameterType']
if 'arrayType' in qp_type:
klass = ArrayQueryParameter
elif 'structTypes' in qp_type:
klass = StructQueryParameter
else:
klass = ScalarQueryParameter
return klass.from_api_repr(resource)


class QueryParametersProperty(object):
"""Custom property type, holding query parameter instances."""

Expand Down
61 changes: 45 additions & 16 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,35 @@ def job_from_resource(self, resource):
return QueryJob.from_api_repr(resource, self)
raise ValueError('Cannot parse job resource')

def get_job(self, job_id, project=None):
"""Fetch a job for the project associated with this client.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get

:type job_id: str
:param job_id: Name of the job.

:type project: str
:param project:
project ID owning the job (defaults to the client's project)

:rtype: :class:`~google.cloud.bigquery.job._AsyncJob`
:returns:
Concrete job instance, based on the resource returned by the API.
"""
extra_params = {'projection': 'full'}

if project is None:
project = self.project

path = '/projects/{}/jobs/{}'.format(project, job_id)

resource = self._connection.api_request(
method='GET', path=path, query_params=extra_params)

return self.job_from_resource(resource)

def list_jobs(self, max_results=None, page_token=None, all_users=None,
state_filter=None):
"""List jobs for the project associated with this client.
Expand Down Expand Up @@ -237,14 +266,14 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
max_results=max_results,
extra_params=extra_params)

def load_table_from_storage(self, job_name, destination, *source_uris):
def load_table_from_storage(self, job_id, destination, *source_uris):
"""Construct a job for loading data into a table from CloudStorage.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load

:type job_name: str
:param job_name: Name of the job.
:type job_id: str
:param job_id: Name of the job.

:type destination: :class:`google.cloud.bigquery.table.Table`
:param destination: Table into which data is to be loaded.
Expand All @@ -256,16 +285,16 @@ def load_table_from_storage(self, job_name, destination, *source_uris):
:rtype: :class:`google.cloud.bigquery.job.LoadJob`
:returns: a new ``LoadJob`` instance
"""
return LoadJob(job_name, destination, source_uris, client=self)
return LoadJob(job_id, destination, source_uris, client=self)

def copy_table(self, job_name, destination, *sources):
def copy_table(self, job_id, destination, *sources):
"""Construct a job for copying one or more tables into another table.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy

:type job_name: str
:param job_name: Name of the job.
:type job_id: str
:param job_id: Name of the job.

:type destination: :class:`google.cloud.bigquery.table.Table`
:param destination: Table into which data is to be copied.
Expand All @@ -276,16 +305,16 @@ def copy_table(self, job_name, destination, *sources):
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
:returns: a new ``CopyJob`` instance
"""
return CopyJob(job_name, destination, sources, client=self)
return CopyJob(job_id, destination, sources, client=self)

def extract_table_to_storage(self, job_name, source, *destination_uris):
def extract_table_to_storage(self, job_id, source, *destination_uris):
"""Construct a job for extracting a table into Cloud Storage files.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract

:type job_name: str
:param job_name: Name of the job.
:type job_id: str
:param job_id: Name of the job.

:type source: :class:`google.cloud.bigquery.table.Table`
:param source: table to be extracted.
Expand All @@ -298,17 +327,17 @@ def extract_table_to_storage(self, job_name, source, *destination_uris):
:rtype: :class:`google.cloud.bigquery.job.ExtractJob`
:returns: a new ``ExtractJob`` instance
"""
return ExtractJob(job_name, source, destination_uris, client=self)
return ExtractJob(job_id, source, destination_uris, client=self)

def run_async_query(self, job_name, query,
def run_async_query(self, job_id, query,
udf_resources=(), query_parameters=()):
"""Construct a job for running a SQL query asynchronously.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query

:type job_name: str
:param job_name: Name of the job.
:type job_id: str
:param job_id: Name of the job.

:type query: str
:param query: SQL query to be executed
Expand All @@ -327,7 +356,7 @@ def run_async_query(self, job_name, query,
:rtype: :class:`google.cloud.bigquery.job.QueryJob`
:returns: a new ``QueryJob`` instance
"""
return QueryJob(job_name, query, client=self,
return QueryJob(job_id, query, client=self,
udf_resources=udf_resources,
query_parameters=query_parameters)

Expand Down
106 changes: 103 additions & 3 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
from google.cloud.bigquery._helpers import QueryParametersProperty
from google.cloud.bigquery._helpers import ScalarQueryParameter
from google.cloud.bigquery._helpers import StructQueryParameter
from google.cloud.bigquery._helpers import UDFResource
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _query_param_from_api_repr
from google.cloud.bigquery._helpers import _TypedProperty

_DONE_STATE = 'DONE'
Expand Down Expand Up @@ -61,6 +63,22 @@
}


def _bool_or_none(value):
"""Helper: deserialize boolean value from JSON string."""
if isinstance(value, bool):
return value
if value is not None:
return value.lower() in ['t', 'true', '1']


def _int_or_none(value):
"""Helper: deserialize int value from JSON string."""
if isinstance(value, int):
return value
if value is not None:
return int(value)


def _error_result_to_exception(error_result):
"""Maps BigQuery error reasons to an exception.

Expand Down Expand Up @@ -311,6 +329,10 @@ def _scrub_local_properties(self, cleaned):
"""Helper: handle subclass properties in cleaned."""
pass

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
raise NotImplementedError("Abstract")

def _set_properties(self, api_response):
"""Update properties from resource in body of ``api_response``

Expand All @@ -330,6 +352,8 @@ def _set_properties(self, api_response):

self._properties.clear()
self._properties.update(cleaned)
configuration = cleaned['configuration'][self._JOB_TYPE]
self._copy_configuration_properties(configuration)

# For Future interface
self._set_future_result()
Expand Down Expand Up @@ -731,7 +755,7 @@ def _populate_config_resource(self, configuration):
if self.quote_character is not None:
configuration['quote'] = self.quote_character
if self.skip_leading_rows is not None:
configuration['skipLeadingRows'] = self.skip_leading_rows
configuration['skipLeadingRows'] = str(self.skip_leading_rows)
if self.source_format is not None:
configuration['sourceFormat'] = self.source_format
if self.write_disposition is not None:
Expand Down Expand Up @@ -769,6 +793,28 @@ def _scrub_local_properties(self, cleaned):
schema = cleaned.pop('schema', {'fields': ()})
self.schema = _parse_schema_resource(schema)

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
self.allow_jagged_rows = _bool_or_none(
configuration.get('allowJaggedRows'))
self.allow_quoted_newlines = _bool_or_none(
configuration.get('allowQuotedNewlines'))
self.autodetect = _bool_or_none(
configuration.get('autodetect'))
self.create_disposition = configuration.get('createDisposition')
self.encoding = configuration.get('encoding')
self.field_delimiter = configuration.get('fieldDelimiter')
self.ignore_unknown_values = _bool_or_none(
configuration.get('ignoreUnknownValues'))
self.max_bad_records = _int_or_none(
configuration.get('maxBadRecords'))
self.null_marker = configuration.get('nullMarker')
self.quote_character = configuration.get('quote')
self.skip_leading_rows = _int_or_none(

This comment was marked as spam.

This comment was marked as spam.

configuration.get('skipLeadingRows'))
self.source_format = configuration.get('sourceFormat')
self.write_disposition = configuration.get('writeDisposition')

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation
Expand Down Expand Up @@ -879,6 +925,11 @@ def _build_resource(self):

return resource

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
self.create_disposition = configuration.get('createDisposition')
self.write_disposition = configuration.get('writeDisposition')

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation
Expand Down Expand Up @@ -1012,6 +1063,14 @@ def _build_resource(self):

return resource

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
self.compression = configuration.get('compression')
self.destination_format = configuration.get('destinationFormat')
self.field_delimiter = configuration.get('fieldDelimiter')
self.print_header = _bool_or_none(
configuration.get('printHeader'))

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation
Expand Down Expand Up @@ -1207,7 +1266,8 @@ def _populate_config_resource(self, configuration):
if self.maximum_billing_tier is not None:
configuration['maximumBillingTier'] = self.maximum_billing_tier
if self.maximum_bytes_billed is not None:
configuration['maximumBytesBilled'] = self.maximum_bytes_billed
configuration['maximumBytesBilled'] = str(
self.maximum_bytes_billed)
if len(self._udf_resources) > 0:
configuration[self._UDF_KEY] = [
{udf_resource.udf_type: udf_resource.value}
Expand Down Expand Up @@ -1257,6 +1317,25 @@ def _scrub_local_properties(self, cleaned):
configuration = cleaned['configuration']['query']

self.query = configuration['query']

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
self.allow_large_results = _bool_or_none(
configuration.get('allowLargeResults'))
self.flatten_results = _bool_or_none(
configuration.get('flattenResults'))
self.use_query_cache = _bool_or_none(
configuration.get('useQueryCache'))
self.use_legacy_sql = _bool_or_none(
configuration.get('useLegacySql'))

self.create_disposition = configuration.get('createDisposition')
self.priority = configuration.get('priority')
self.write_disposition = configuration.get('writeDisposition')
self.maximum_billing_tier = configuration.get('maximumBillingTier')
self.maximum_bytes_billed = _int_or_none(
configuration.get('maximumBytesBilled'))

dest_remote = configuration.get('destinationTable')

if dest_remote is None:
Expand All @@ -1265,9 +1344,30 @@ def _scrub_local_properties(self, cleaned):
else:
dest_local = self._destination_table_resource()
if dest_remote != dest_local:
dataset = self._client.dataset(dest_remote['datasetId'])
project = dest_remote['projectId']
dataset = self._client.dataset(
dest_remote['datasetId'], project=project)
self.destination = dataset.table(dest_remote['tableId'])

def_ds = configuration.get('defaultDataset')
if def_ds is None:
if self.default_dataset is not None:
del self.default_dataset
else:
project = def_ds['projectId']
self.default_dataset = self._client.dataset(def_ds['datasetId'])

udf_resources = []
for udf_mapping in configuration.get(self._UDF_KEY, ()):
key_val, = udf_mapping.items()
udf_resources.append(UDFResource(key_val[0], key_val[1]))
self._udf_resources = udf_resources

self._query_parameters = [
_query_param_from_api_repr(mapping)
for mapping in configuration.get(self._QUERY_PARAMETERS_KEY, ())
]

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a job given its API representation
Expand Down
Loading