From b8cde46273fdf6760d8d0bad5329f02fdb81dca8 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 21 Nov 2017 14:14:45 -0800 Subject: [PATCH] BigQuery: removes Client.query_rows() (#4429) --- bigquery/google/cloud/bigquery/client.py | 68 +----- bigquery/tests/system.py | 32 +-- bigquery/tests/unit/test_client.py | 255 ----------------------- docs/bigquery/snippets.py | 33 ++- docs/bigquery/usage.rst | 28 +-- 5 files changed, 46 insertions(+), 370 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index c12979e98f30..757fe6bfd229 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -17,7 +17,6 @@ from __future__ import absolute_import import collections -import concurrent.futures import functools import os import uuid @@ -29,8 +28,6 @@ from google.resumable_media.requests import ResumableUpload from google.api_core import page_iterator -from google.api_core.exceptions import GoogleAPICallError -from google.api_core.exceptions import NotFound from google.cloud import exceptions from google.cloud.client import ClientWithProject @@ -1144,67 +1141,6 @@ def create_rows_json(self, table, json_rows, row_ids=None, return errors - def query_rows( - self, query, job_config=None, job_id=None, job_id_prefix=None, - timeout=None, retry=DEFAULT_RETRY): - """Start a query job and wait for the results. - - See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query - - :type query: str - :param query: - SQL query to be executed. Defaults to the standard SQL dialect. - Use the ``job_config`` parameter to change dialects. - - :type job_config: :class:`google.cloud.bigquery.job.QueryJobConfig` - :param job_config: (Optional) Extra configuration options for the job. - - :type job_id: str - :param job_id: (Optional) ID to use for the query job. - - :type job_id_prefix: str or ``NoneType`` - :param job_id_prefix: (Optional) the user-provided prefix for a - randomly generated job ID. This parameter will be - ignored if a ``job_id`` is also given. - - :type timeout: float - :param timeout: - (Optional) How long (in seconds) to wait for job to complete - before raising a :class:`concurrent.futures.TimeoutError`. - - :rtype: :class:`~google.api_core.page_iterator.Iterator` - :returns: - Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. - During each page, the iterator will have the ``total_rows`` - attribute set, which counts the total number of rows **in the - result set** (this is distinct from the total number of rows in - the current page: ``iterator.page.num_items``). - - :raises: - :class:`~google.api_core.exceptions.GoogleAPICallError` if the - job failed or :class:`concurrent.futures.TimeoutError` if the job - did not complete in the given timeout. - - When an exception happens, the query job will be cancelled on a - best-effort basis. - """ - job_id = _make_job_id(job_id, job_id_prefix) - - try: - job = self.query( - query, job_config=job_config, job_id=job_id, retry=retry) - rows_iterator = job.result(timeout=timeout) - except (GoogleAPICallError, concurrent.futures.TimeoutError): - try: - self.cancel_job(job_id) - except NotFound: - # It's OK if couldn't cancel because job never got created. - pass - raise - - return rows_iterator - def list_rows(self, table, selected_fields=None, max_results=None, page_token=None, start_index=None, retry=DEFAULT_RETRY): """List the rows of the table. @@ -1303,12 +1239,12 @@ def list_partitions(self, table, retry=DEFAULT_RETRY): """ config = QueryJobConfig() config.use_legacy_sql = True # required for '$' syntax - rows = self.query_rows( + query_job = self.query( 'SELECT partition_id from [%s:%s.%s$__PARTITIONS_SUMMARY__]' % (table.project, table.dataset_id, table.table_id), job_config=config, retry=retry) - return [row[0] for row in rows] + return [row[0] for row in query_job] # pylint: disable=unused-argument diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 05faf6fb71ee..6a26922f3564 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -673,7 +673,7 @@ def test_job_cancel(self): # raise an error, and that the job completed (in the `retry()` # above). - def test_query_rows_w_legacy_sql_types(self): + def test_query_w_legacy_sql_types(self): naive = datetime.datetime(2016, 12, 5, 12, 41, 9) stamp = '%s %s' % (naive.date().isoformat(), naive.time().isoformat()) zoned = naive.replace(tzinfo=UTC) @@ -706,7 +706,7 @@ def test_query_rows_w_legacy_sql_types(self): for example in examples: job_config = bigquery.QueryJobConfig() job_config.use_legacy_sql = True - rows = list(Config.CLIENT.query_rows( + rows = list(Config.CLIENT.query( example['sql'], job_config=job_config)) self.assertEqual(len(rows), 1) self.assertEqual(len(rows[0]), 1) @@ -806,26 +806,28 @@ def _generate_standard_sql_types_examples(self): }, ] - def test_query_rows_w_standard_sql_types(self): + def test_query_w_standard_sql_types(self): examples = self._generate_standard_sql_types_examples() for example in examples: - rows = list(Config.CLIENT.query_rows(example['sql'])) + rows = list(Config.CLIENT.query(example['sql'])) self.assertEqual(len(rows), 1) self.assertEqual(len(rows[0]), 1) self.assertEqual(rows[0][0], example['expected']) - def test_query_rows_w_failed_query(self): + def test_query_w_failed_query(self): from google.api_core.exceptions import BadRequest with self.assertRaises(BadRequest): - Config.CLIENT.query_rows('invalid syntax;') + Config.CLIENT.query('invalid syntax;').result() + + def test_query_w_timeout(self): + query_job = Config.CLIENT.query( + 'SELECT * FROM `bigquery-public-data.github_repos.commits`;', + job_id_prefix='test_query_w_timeout_') - def test_query_rows_w_timeout(self): with self.assertRaises(concurrent.futures.TimeoutError): - Config.CLIENT.query_rows( - 'SELECT * FROM `bigquery-public-data.github_repos.commits`;', - job_id_prefix='test_query_rows_w_timeout_', - timeout=1) # 1 second is much too short for this query. + # 1 second is much too short for this query. + query_job.result(timeout=1) def test_dbapi_w_standard_sql_types(self): examples = self._generate_standard_sql_types_examples() @@ -1224,9 +1226,9 @@ def test_large_query_w_public_data(self): SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format( PUBLIC, DATASET_ID, TABLE_NAME, LIMIT) - iterator = Config.CLIENT.query_rows(SQL) + query_job = Config.CLIENT.query(SQL) - rows = list(iterator) + rows = list(query_job) self.assertEqual(len(rows), LIMIT) def test_query_future(self): @@ -1256,7 +1258,7 @@ def test_query_table_def(self): job_config.table_definitions = {table_id: ec} sql = 'SELECT * FROM %s' % table_id - got_rows = Config.CLIENT.query_rows(sql, job_config=job_config) + got_rows = Config.CLIENT.query(sql, job_config=job_config) row_tuples = [r.values() for r in got_rows] by_age = operator.itemgetter(1) @@ -1280,7 +1282,7 @@ def test_query_external_table(self): sql = 'SELECT * FROM %s.%s' % (dataset_id, table_id) - got_rows = Config.CLIENT.query_rows(sql) + got_rows = Config.CLIENT.query(sql) row_tuples = [r.values() for r in got_rows] by_age = operator.itemgetter(1) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index e6f04382d724..721181e3cc81 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures import copy import email import io @@ -2464,260 +2463,6 @@ def test_create_rows_json(self): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) - def test_query_rows_defaults(self): - from google.api_core.page_iterator import HTTPIterator - from google.cloud.bigquery.table import Row - - JOB = 'job-id' - QUERY = 'SELECT COUNT(*) FROM persons' - RESOURCE = { - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': JOB, - }, - 'configuration': { - 'query': { - 'query': QUERY, - 'destinationTable': { - 'projectId': self.PROJECT, - 'datasetId': '_temp_dataset', - 'tableId': '_temp_table', - }, - }, - }, - 'status': { - 'state': 'DONE', - }, - } - RESULTS_RESOURCE = { - 'jobReference': RESOURCE['jobReference'], - 'jobComplete': True, - 'schema': { - 'fields': [ - {'name': 'field0', 'type': 'INTEGER', 'mode': 'NULLABLE'}, - ] - }, - 'totalRows': '3', - 'pageToken': 'next-page', - } - FIRST_PAGE = copy.deepcopy(RESULTS_RESOURCE) - FIRST_PAGE['rows'] = [ - {'f': [{'v': '1'}]}, - {'f': [{'v': '2'}]}, - ] - LAST_PAGE = copy.deepcopy(RESULTS_RESOURCE) - LAST_PAGE['rows'] = [ - {'f': [{'v': '3'}]}, - ] - del LAST_PAGE['pageToken'] - creds = _make_credentials() - http = object() - client = self._make_one(project=self.PROJECT, credentials=creds, - _http=http) - conn = client._connection = _Connection( - RESOURCE, RESULTS_RESOURCE, FIRST_PAGE, LAST_PAGE) - - rows_iter = client.query_rows(QUERY) - rows = list(rows_iter) - - self.assertEqual(rows, [Row((i,), {'field0': 0}) for i in (1, 2, 3)]) - self.assertIs(rows_iter.client, client) - self.assertIsInstance(rows_iter, HTTPIterator) - self.assertEqual(len(conn._requested), 4) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/projects/PROJECT/jobs') - self.assertIsInstance( - req['data']['jobReference']['jobId'], six.string_types) - - def test_query_rows_w_job_id(self): - from google.api_core.page_iterator import HTTPIterator - - JOB = 'job-id' - QUERY = 'SELECT COUNT(*) FROM persons' - RESOURCE = { - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': JOB, - }, - 'configuration': { - 'query': { - 'query': QUERY, - 'destinationTable': { - 'projectId': self.PROJECT, - 'datasetId': '_temp_dataset', - 'tableId': '_temp_table', - }, - }, - }, - 'status': { - 'state': 'DONE', - }, - } - RESULTS_RESOURCE = { - 'jobReference': RESOURCE['jobReference'], - 'jobComplete': True, - 'schema': { - 'fields': [ - {'name': 'field0', 'type': 'INTEGER', 'mode': 'NULLABLE'}, - ] - }, - 'totalRows': '0', - } - creds = _make_credentials() - http = object() - client = self._make_one(project=self.PROJECT, credentials=creds, - _http=http) - conn = client._connection = _Connection( - RESOURCE, RESULTS_RESOURCE, RESULTS_RESOURCE) - - rows_iter = client.query_rows(QUERY, job_id=JOB) - rows = list(rows_iter) - - self.assertEqual(rows, []) - self.assertIs(rows_iter.client, client) - self.assertIsInstance(rows_iter, HTTPIterator) - self.assertEqual(len(conn._requested), 3) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/projects/PROJECT/jobs') - self.assertEqual(req['data']['jobReference']['jobId'], JOB) - - def test_query_rows_w_job_config(self): - from google.cloud.bigquery.job import QueryJobConfig - from google.api_core.page_iterator import HTTPIterator - - JOB = 'job-id' - QUERY = 'SELECT COUNT(*) FROM persons' - RESOURCE = { - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': JOB, - }, - 'configuration': { - 'query': { - 'query': QUERY, - 'useLegacySql': True, - 'destinationTable': { - 'projectId': self.PROJECT, - 'datasetId': '_temp_dataset', - 'tableId': '_temp_table', - }, - }, - 'dryRun': True, - }, - 'status': { - 'state': 'DONE', - }, - } - RESULTS_RESOURCE = { - 'jobReference': RESOURCE['jobReference'], - 'jobComplete': True, - 'schema': { - 'fields': [ - {'name': 'field0', 'type': 'INTEGER', 'mode': 'NULLABLE'}, - ] - }, - 'totalRows': '0', - } - creds = _make_credentials() - http = object() - client = self._make_one(project=self.PROJECT, credentials=creds, - _http=http) - conn = client._connection = _Connection( - RESOURCE, RESULTS_RESOURCE, RESULTS_RESOURCE) - - job_config = QueryJobConfig() - job_config.use_legacy_sql = True - job_config.dry_run = True - rows_iter = client.query_rows(QUERY, job_id=JOB, job_config=job_config) - - self.assertIsInstance(rows_iter, HTTPIterator) - self.assertEqual(len(conn._requested), 2) - req = conn._requested[0] - configuration = req['data']['configuration'] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/projects/PROJECT/jobs') - self.assertEqual(req['data']['jobReference']['jobId'], JOB) - self.assertEqual(configuration['query']['useLegacySql'], True) - self.assertEqual(configuration['dryRun'], True) - - def test_query_rows_w_timeout_error(self): - JOB = 'job-id' - QUERY = 'SELECT COUNT(*) FROM persons' - RESOURCE = { - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': JOB, - }, - 'configuration': { - 'query': { - 'query': QUERY, - }, - }, - 'status': { - 'state': 'RUNNING', - }, - } - CANCEL_RESOURCE = {'job': RESOURCE} - creds = _make_credentials() - http = object() - client = self._make_one( - project=self.PROJECT, credentials=creds, _http=http) - conn = client._connection = _Connection(RESOURCE, CANCEL_RESOURCE) - - with mock.patch( - 'google.cloud.bigquery.job.QueryJob.result') as mock_result: - mock_result.side_effect = concurrent.futures.TimeoutError( - 'time is up') - - with self.assertRaises(concurrent.futures.TimeoutError): - client.query_rows( - QUERY, - job_id_prefix='test_query_rows_w_timeout_', - timeout=1) - - # Should attempt to create and cancel the job. - self.assertEqual(len(conn._requested), 2) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/projects/PROJECT/jobs') - cancelreq = conn._requested[1] - self.assertEqual(cancelreq['method'], 'POST') - self.assertIn( - '/projects/PROJECT/jobs/test_query_rows_w_timeout_', - cancelreq['path']) - self.assertIn('/cancel', cancelreq['path']) - - def test_query_rows_w_api_error(self): - from google.api_core.exceptions import NotFound - - QUERY = 'SELECT COUNT(*) FROM persons' - creds = _make_credentials() - http = object() - client = self._make_one( - project=self.PROJECT, credentials=creds, _http=http) - conn = client._connection = _Connection() - - # Expect a 404 error since we didn't supply a job resource. - with self.assertRaises(NotFound): - client.query_rows( - QUERY, - job_id_prefix='test_query_rows_w_error_', - timeout=1) - - # Should attempt to create and cancel the job. - self.assertEqual(len(conn._requested), 2) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/projects/PROJECT/jobs') - cancelreq = conn._requested[1] - self.assertEqual(cancelreq['method'], 'POST') - self.assertIn( - '/projects/PROJECT/jobs/test_query_rows_w_error_', - cancelreq['path']) - self.assertIn('/cancel', cancelreq['path']) - def test_list_rows(self): import datetime from google.cloud._helpers import UTC diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index a4309947b8b3..e6e0a53f05ce 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -553,6 +553,22 @@ def test_delete_table(client, to_delete): # [END delete_table] +def test_client_simple_query(client): + """Run a simple query.""" + + # [START client_simple_query] + QUERY = ( + 'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` ' + 'WHERE state = "TX" ' + 'LIMIT 100') + query_job = client.query(QUERY) + + for row in query_job: # API request + # Row values can be accessed by field name or index + assert row[0] == row.name == row['name'] + # [END client_simple_query] + + def test_client_query(client): """Run a query""" @@ -605,23 +621,6 @@ def test_client_query_w_param(client): # [END client_query_w_param] -def test_client_query_rows(client): - """Run a simple query.""" - - # [START client_query_rows] - QUERY = ( - 'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` ' - 'WHERE state = "TX" ' - 'LIMIT 100') - TIMEOUT = 30 # in seconds - rows = list(client.query_rows(QUERY, timeout=TIMEOUT)) # API request - - assert len(rows) == 100 - row = rows[0] - assert row[0] == row.name == row['name'] - # [END client_query_rows] - - def test_client_list_jobs(client): """List jobs for a project.""" diff --git a/docs/bigquery/usage.rst b/docs/bigquery/usage.rst index 0336b7277ea2..b4d9dc348b4b 100644 --- a/docs/bigquery/usage.rst +++ b/docs/bigquery/usage.rst @@ -207,6 +207,17 @@ Delete a table: Queries ------- + +Run a simple query +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Run a query and wait for it to finish: + +.. literalinclude:: snippets.py + :start-after: [START client_simple_query] + :end-before: [END client_simple_query] + + Querying data ~~~~~~~~~~~~~ @@ -231,23 +242,6 @@ See BigQuery documentation for more information on :end-before: [END client_query_w_param] -Querying Table Rows -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Run a query and wait for it to finish: - -.. literalinclude:: snippets.py - :start-after: [START client_query_rows] - :end-before: [END client_query_rows] - -.. note:: - - - Use of the ``timeout`` parameter is optional. The query will continue to - run in the background even if it takes longer the timeout allowed. The job - may be retrieved using the job ID via - :meth:`~google.cloud.bigquery.client.Client.get_job` - - List jobs for a project ~~~~~~~~~~~~~~~~~~~~~~~~~~~