Skip to content

Commit

Permalink
BigQuery: add client.query_rows(), remove client.run_sync_query(). (#…
Browse files Browse the repository at this point in the history
…4065)

* BigQuery: add client.query_rows(), remove client.run_sync_query().

The query_rows() method will be the new way to run a query
synchronously. It starts a query job, then waits for the results,
returning the rows as results.
  • Loading branch information
tswast authored Sep 27, 2017
1 parent 11a8916 commit e028c38
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 130 deletions.
2 changes: 2 additions & 0 deletions bigquery/google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.job import QueryJobConfig
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table

Expand All @@ -45,6 +46,7 @@
'Dataset',
'CopyJobConfig',
'ExtractJobConfig',
'QueryJobConfig',
'ScalarQueryParameter',
'SchemaField',
'StructQueryParameter',
Expand Down
50 changes: 34 additions & 16 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from google.cloud.bigquery.job import ExtractJob
from google.cloud.bigquery.job import LoadJob
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import QueryJobConfig
from google.cloud.bigquery.query import QueryResults


Expand Down Expand Up @@ -612,29 +613,46 @@ def run_async_query(self, job_id, query,
udf_resources=udf_resources,
query_parameters=query_parameters)

def run_sync_query(self, query, udf_resources=(), query_parameters=()):
"""Run a SQL query synchronously.
def query_rows(self, query, job_config=None, job_id=None, timeout=None):
"""Start a query job and wait for the results.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query
:type query: str
:param query: SQL query to be executed
:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`google.cloud.bigquery._helpers.UDFResource`
(empty by default)
:type job_id: str
:param job_id: (Optional) ID to use for the query job.
:type query_parameters: tuple
:param query_parameters:
An iterable of
:class:`google.cloud.bigquery._helpers.AbstractQueryParameter`
(empty by default)
:type timeout: int
:param timeout:
(Optional) How long to wait for job to complete before raising a
:class:`TimeoutError`.
:rtype: :class:`google.cloud.bigquery.query.QueryResults`
:returns: a new ``QueryResults`` instance
:rtype: :class:`~google.api.core.page_iterator.Iterator`
:returns:
Iterator of row data :class:`tuple`s. During each page, the
iterator will have the ``total_rows`` attribute set, which counts
the total number of rows **in the result set** (this is distinct
from the total number of rows in the current page:
``iterator.page.num_items``).
:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
return QueryResults(query, client=self,
udf_resources=udf_resources,
query_parameters=query_parameters)
job_id = _make_job_id(job_id)

# TODO(swast): move standard SQL default to QueryJobConfig class.
if job_config is None:
job_config = QueryJobConfig()
if job_config.use_legacy_sql is None:
job_config.use_legacy_sql = False

job = QueryJob(job_id, query, client=self, job_config=job_config)
job.begin()
return job.result(timeout=timeout)


# pylint: disable=unused-argument
Expand Down
163 changes: 116 additions & 47 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,11 +1218,52 @@ def from_api_repr(cls, resource, client):
return job


class _AsyncQueryConfiguration(object):
"""User-settable configuration options for asynchronous query jobs.
class QueryJobConfig(object):
"""Configuration options for query jobs.
Values which are ``None`` -> server defaults.
All properties in this class are optional. Values which are ``None`` ->
server defaults.
"""

def __init__(self):
self._properties = {}

def to_api_repr(self):
"""Build an API representation of the copy job config.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
"""
return copy.deepcopy(self._properties)

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a job configuration given its API representation
:type resource: dict
:param resource:
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig`
:returns: Configuration parsed from ``resource``.
"""
config = cls()
config._properties = copy.deepcopy(resource)
return config

use_legacy_sql = _TypedApiResourceProperty(
'use_legacy_sql', 'useLegacySql', bool)
"""See
https://cloud.google.com/bigquery/docs/\
reference/v2/jobs#configuration.query.useLegacySql
"""

dry_run = _TypedApiResourceProperty('dry_run', 'dryRun', bool)
"""See
https://g.co/cloud/bigquery/docs/reference/v2/jobs#configuration.dryRun
"""

_allow_large_results = None
_create_disposition = None
_default_dataset = None
Expand All @@ -1231,7 +1272,6 @@ class _AsyncQueryConfiguration(object):
_priority = None
_use_query_cache = None
_use_legacy_sql = None
_dry_run = None
_write_disposition = None
_maximum_billing_tier = None
_maximum_bytes_billed = None
Expand Down Expand Up @@ -1260,20 +1300,60 @@ class QueryJob(_AsyncJob):
An iterable of
:class:`google.cloud.bigquery._helpers.AbstractQueryParameter`
(empty by default)
:type job_config: :class:`~google.cloud.bigquery.job.QueryJobConfig`
:param job_config:
(Optional) Extra configuration options for the query job.
"""
_JOB_TYPE = 'query'
_UDF_KEY = 'userDefinedFunctionResources'
_QUERY_PARAMETERS_KEY = 'queryParameters'

def __init__(self, job_id, query, client,
udf_resources=(), query_parameters=()):
udf_resources=(), query_parameters=(), job_config=None):
super(QueryJob, self).__init__(job_id, client)

if job_config is None:
job_config = QueryJobConfig()

self.query = query
self.udf_resources = udf_resources
self.query_parameters = query_parameters
self._configuration = _AsyncQueryConfiguration()
self._configuration = job_config
self._query_results = None

@property
def use_legacy_sql(self):
"""See
:class:`~google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
"""
return self._configuration.use_legacy_sql

@use_legacy_sql.setter
def use_legacy_sql(self, value):
"""See
:class:`~google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
"""
# TODO(swast): remove this method and only allow setting use_legacy_sql
# on QueryJobConfig objects.
self._configuration.use_legacy_sql = value

@property
def dry_run(self):
"""See
:class:`~google.cloud.bigquery.job.QueryJobConfig.dry_run`.
"""
return self._configuration.dry_run

@dry_run.setter
def dry_run(self, value):
"""See
:class:`~google.cloud.bigquery.job.QueryJobConfig.dry_run`.
"""
# TODO(swast): remove this method and only allow setting dry_run
# on QueryJobConfig objects.
self._configuration.dry_run = value

allow_large_results = _TypedProperty('allow_large_results', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.allowLargeResults
Expand Down Expand Up @@ -1314,20 +1394,8 @@ def __init__(self, job_id, query, client,
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.useQueryCache
"""

use_legacy_sql = _TypedProperty('use_legacy_sql', bool)
"""See
https://cloud.google.com/bigquery/docs/\
reference/v2/jobs#configuration.query.useLegacySql
"""

dry_run = _TypedProperty('dry_run', bool)
"""See
https://cloud.google.com/bigquery/docs/\
reference/rest/v2/jobs#configuration.dryRun
"""

write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
write_disposition = WriteDisposition(
'write_disposition', 'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition
"""
Expand Down Expand Up @@ -1363,8 +1431,6 @@ def _populate_config_resource_booleans(self, configuration):
configuration['flattenResults'] = self.flatten_results
if self.use_query_cache is not None:
configuration['useQueryCache'] = self.use_query_cache
if self.use_legacy_sql is not None:
configuration['useLegacySql'] = self.use_legacy_sql

def _populate_config_resource(self, configuration):
"""Helper for _build_resource: copy config properties to resource"""
Expand All @@ -1377,8 +1443,8 @@ def _populate_config_resource(self, configuration):
'projectId': self.default_dataset.project,
'datasetId': self.default_dataset.dataset_id,
}
if self.destination is not None:
table_res = self._destination_table_resource()
table_res = self._destination_table_resource()
if table_res is not None:
configuration['destinationTable'] = table_res
if self.priority is not None:
configuration['priority'] = self.priority
Expand Down Expand Up @@ -1406,23 +1472,26 @@ def _populate_config_resource(self, configuration):

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
configuration = self._configuration.to_api_repr()

resource = {
'jobReference': {
'projectId': self.project,
'jobId': self.job_id,
},
'configuration': {
self._JOB_TYPE: {
'query': self.query,
},
self._JOB_TYPE: configuration,
},
}

if self.dry_run is not None:
resource['configuration']['dryRun'] = self.dry_run
# The dryRun property only applies to query jobs, but it is defined at
# a level higher up. We need to remove it from the query config.
if 'dryRun' in configuration:
dry_run = configuration['dryRun']
del configuration['dryRun']
resource['configuration']['dryRun'] = dry_run

configuration = resource['configuration'][self._JOB_TYPE]
configuration['query'] = self.query
self._populate_config_resource(configuration)

return resource
Expand All @@ -1436,19 +1505,28 @@ def _scrub_local_properties(self, cleaned):
the client's project.
"""
configuration = cleaned['configuration']['query']

self.query = configuration['query']

# The dryRun property only applies to query jobs, but it is defined at
# a level higher up. We need to copy it to the query config.
self._configuration.dry_run = cleaned['configuration'].get('dryRun')

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
# The dryRun property only applies to query jobs, but it is defined at
# a level higher up. We need to copy it to the query config.
# It should already be correctly set by the _scrub_local_properties()
# method.
dry_run = self.dry_run
self._configuration = QueryJobConfig.from_api_repr(configuration)
self._configuration.dry_run = dry_run

self.allow_large_results = _bool_or_none(
configuration.get('allowLargeResults'))
self.flatten_results = _bool_or_none(
configuration.get('flattenResults'))
self.use_query_cache = _bool_or_none(
configuration.get('useQueryCache'))
self.use_legacy_sql = _bool_or_none(
configuration.get('useLegacySql'))

self.create_disposition = configuration.get('createDisposition')
self.priority = configuration.get('priority')
Expand All @@ -1459,22 +1537,13 @@ def _copy_configuration_properties(self, configuration):

dest_remote = configuration.get('destinationTable')

if dest_remote is None:
if self.destination is not None:
del self.destination
else:
dest_local = self._destination_table_resource()
if dest_remote != dest_local:
project = dest_remote['projectId']
dataset = Dataset(DatasetReference(project,
dest_remote['datasetId']))
self.destination = dataset.table(dest_remote['tableId'])
if dest_remote is not None:
dataset = DatasetReference(
dest_remote['projectId'], dest_remote['datasetId'])
self.destination = dataset.table(dest_remote['tableId'])

def_ds = configuration.get('defaultDataset')
if def_ds is None:
if self.default_dataset is not None:
del self.default_dataset
else:
if def_ds is not None:
self.default_dataset = DatasetReference(
def_ds['projectId'], def_ds['datasetId'])
udf_resources = []
Expand Down
1 change: 1 addition & 0 deletions bigquery/google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
next_token='pageToken',
extra_params=params)
iterator.query_result = self
iterator.job = self.job
return iterator


Expand Down
Loading

0 comments on commit e028c38

Please sign in to comment.