Skip to content

Commit

Permalink
BigQuery: removes Client.query_rows() (#4429)
Browse files Browse the repository at this point in the history
  • Loading branch information
alixhami authored and tswast committed Nov 21, 2017
1 parent 58b642e commit b8cde46
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 370 deletions.
68 changes: 2 additions & 66 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import absolute_import

import collections
import concurrent.futures
import functools
import os
import uuid
Expand All @@ -29,8 +28,6 @@
from google.resumable_media.requests import ResumableUpload

from google.api_core import page_iterator
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import NotFound
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

Expand Down Expand Up @@ -1144,67 +1141,6 @@ def create_rows_json(self, table, json_rows, row_ids=None,

return errors

def query_rows(
self, query, job_config=None, job_id=None, job_id_prefix=None,
timeout=None, retry=DEFAULT_RETRY):
"""Start a query job and wait for the results.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query
:type query: str
:param query:
SQL query to be executed. Defaults to the standard SQL dialect.
Use the ``job_config`` parameter to change dialects.
:type job_config: :class:`google.cloud.bigquery.job.QueryJobConfig`
:param job_config: (Optional) Extra configuration options for the job.
:type job_id: str
:param job_id: (Optional) ID to use for the query job.
:type job_id_prefix: str or ``NoneType``
:param job_id_prefix: (Optional) the user-provided prefix for a
randomly generated job ID. This parameter will be
ignored if a ``job_id`` is also given.
:type timeout: float
:param timeout:
(Optional) How long (in seconds) to wait for job to complete
before raising a :class:`concurrent.futures.TimeoutError`.
:rtype: :class:`~google.api_core.page_iterator.Iterator`
:returns:
Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s.
During each page, the iterator will have the ``total_rows``
attribute set, which counts the total number of rows **in the
result set** (this is distinct from the total number of rows in
the current page: ``iterator.page.num_items``).
:raises:
:class:`~google.api_core.exceptions.GoogleAPICallError` if the
job failed or :class:`concurrent.futures.TimeoutError` if the job
did not complete in the given timeout.
When an exception happens, the query job will be cancelled on a
best-effort basis.
"""
job_id = _make_job_id(job_id, job_id_prefix)

try:
job = self.query(
query, job_config=job_config, job_id=job_id, retry=retry)
rows_iterator = job.result(timeout=timeout)
except (GoogleAPICallError, concurrent.futures.TimeoutError):
try:
self.cancel_job(job_id)
except NotFound:
# It's OK if couldn't cancel because job never got created.
pass
raise

return rows_iterator

def list_rows(self, table, selected_fields=None, max_results=None,
page_token=None, start_index=None, retry=DEFAULT_RETRY):
"""List the rows of the table.
Expand Down Expand Up @@ -1303,12 +1239,12 @@ def list_partitions(self, table, retry=DEFAULT_RETRY):
"""
config = QueryJobConfig()
config.use_legacy_sql = True # required for '$' syntax
rows = self.query_rows(
query_job = self.query(
'SELECT partition_id from [%s:%s.%s$__PARTITIONS_SUMMARY__]' %
(table.project, table.dataset_id, table.table_id),
job_config=config,
retry=retry)
return [row[0] for row in rows]
return [row[0] for row in query_job]


# pylint: disable=unused-argument
Expand Down
32 changes: 17 additions & 15 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ def test_job_cancel(self):
# raise an error, and that the job completed (in the `retry()`
# above).

def test_query_rows_w_legacy_sql_types(self):
def test_query_w_legacy_sql_types(self):
naive = datetime.datetime(2016, 12, 5, 12, 41, 9)
stamp = '%s %s' % (naive.date().isoformat(), naive.time().isoformat())
zoned = naive.replace(tzinfo=UTC)
Expand Down Expand Up @@ -706,7 +706,7 @@ def test_query_rows_w_legacy_sql_types(self):
for example in examples:
job_config = bigquery.QueryJobConfig()
job_config.use_legacy_sql = True
rows = list(Config.CLIENT.query_rows(
rows = list(Config.CLIENT.query(
example['sql'], job_config=job_config))
self.assertEqual(len(rows), 1)
self.assertEqual(len(rows[0]), 1)
Expand Down Expand Up @@ -806,26 +806,28 @@ def _generate_standard_sql_types_examples(self):
},
]

def test_query_rows_w_standard_sql_types(self):
def test_query_w_standard_sql_types(self):
examples = self._generate_standard_sql_types_examples()
for example in examples:
rows = list(Config.CLIENT.query_rows(example['sql']))
rows = list(Config.CLIENT.query(example['sql']))
self.assertEqual(len(rows), 1)
self.assertEqual(len(rows[0]), 1)
self.assertEqual(rows[0][0], example['expected'])

def test_query_rows_w_failed_query(self):
def test_query_w_failed_query(self):
from google.api_core.exceptions import BadRequest

with self.assertRaises(BadRequest):
Config.CLIENT.query_rows('invalid syntax;')
Config.CLIENT.query('invalid syntax;').result()

def test_query_w_timeout(self):
query_job = Config.CLIENT.query(
'SELECT * FROM `bigquery-public-data.github_repos.commits`;',
job_id_prefix='test_query_w_timeout_')

def test_query_rows_w_timeout(self):
with self.assertRaises(concurrent.futures.TimeoutError):
Config.CLIENT.query_rows(
'SELECT * FROM `bigquery-public-data.github_repos.commits`;',
job_id_prefix='test_query_rows_w_timeout_',
timeout=1) # 1 second is much too short for this query.
# 1 second is much too short for this query.
query_job.result(timeout=1)

def test_dbapi_w_standard_sql_types(self):
examples = self._generate_standard_sql_types_examples()
Expand Down Expand Up @@ -1224,9 +1226,9 @@ def test_large_query_w_public_data(self):
SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format(
PUBLIC, DATASET_ID, TABLE_NAME, LIMIT)

iterator = Config.CLIENT.query_rows(SQL)
query_job = Config.CLIENT.query(SQL)

rows = list(iterator)
rows = list(query_job)
self.assertEqual(len(rows), LIMIT)

def test_query_future(self):
Expand Down Expand Up @@ -1256,7 +1258,7 @@ def test_query_table_def(self):
job_config.table_definitions = {table_id: ec}
sql = 'SELECT * FROM %s' % table_id

got_rows = Config.CLIENT.query_rows(sql, job_config=job_config)
got_rows = Config.CLIENT.query(sql, job_config=job_config)

row_tuples = [r.values() for r in got_rows]
by_age = operator.itemgetter(1)
Expand All @@ -1280,7 +1282,7 @@ def test_query_external_table(self):

sql = 'SELECT * FROM %s.%s' % (dataset_id, table_id)

got_rows = Config.CLIENT.query_rows(sql)
got_rows = Config.CLIENT.query(sql)

row_tuples = [r.values() for r in got_rows]
by_age = operator.itemgetter(1)
Expand Down
Loading

0 comments on commit b8cde46

Please sign in to comment.