diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 201a9c76e555..7557111d100e 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -678,3 +678,44 @@ def __set__(self, instance, value): raise ValueError( "query parameters must be derived from AbstractQueryParameter") instance._query_parameters = tuple(value) + + +def _item_to_row(iterator, resource): + """Convert a JSON row to the native object. + + .. note:: + + This assumes that the ``schema`` attribute has been + added to the iterator after being created, which + should be done by the caller. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type resource: dict + :param resource: An item to be converted to a row. + + :rtype: tuple + :returns: The next row in the page. + """ + return _row_from_json(resource, iterator.schema) + + +# pylint: disable=unused-argument +def _rows_page_start(iterator, page, response): + """Grab total rows when :class:`~google.cloud.iterator.Page` starts. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type page: :class:`~google.cloud.iterator.Page` + :param page: The page that was just created. + + :type response: dict + :param response: The JSON API response for a page of rows in a table. + """ + total_rows = response.get('totalRows') + if total_rows is not None: + total_rows = int(total_rows) + iterator.total_rows = total_rows +# pylint: enable=unused-argument diff --git a/bigquery/google/cloud/bigquery/query.py b/bigquery/google/cloud/bigquery/query.py index ea704bf4a8e5..6db2742bbe01 100644 --- a/bigquery/google/cloud/bigquery/query.py +++ b/bigquery/google/cloud/bigquery/query.py @@ -16,6 +16,7 @@ import six +from google.cloud.iterator import HTTPIterator from google.cloud.bigquery._helpers import _TypedProperty from google.cloud.bigquery._helpers import _rows_from_json from google.cloud.bigquery.dataset import Dataset @@ -23,6 +24,8 @@ from google.cloud.bigquery.table import _parse_schema_resource from google.cloud.bigquery._helpers import QueryParametersProperty from google.cloud.bigquery._helpers import UDFResourcesProperty +from google.cloud.bigquery._helpers import _item_to_row +from google.cloud.bigquery._helpers import _rows_page_start class _SyncQueryConfiguration(object): @@ -426,12 +429,6 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None, client = self._require_client(client) params = {} - if max_results is not None: - params['maxResults'] = max_results - - if page_token is not None: - params['pageToken'] = page_token - if start_index is not None: params['startIndex'] = start_index @@ -439,15 +436,37 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None, params['timeoutMs'] = timeout_ms path = '/projects/%s/queries/%s' % (self.project, self.name) - response = client._connection.api_request(method='GET', - path=path, - query_params=params) - self._set_properties(response) + iterator = HTTPIterator(client=client, path=path, + item_to_value=_item_to_row, + items_key='rows', + page_token=page_token, + max_results=max_results, + page_start=_rows_page_start_query, + extra_params=params) + iterator.query_result = self + # Over-ride the key used to retrieve the next page token. + iterator._NEXT_TOKEN = 'pageToken' + return iterator - total_rows = response.get('totalRows') - if total_rows is not None: - total_rows = int(total_rows) - page_token = response.get('pageToken') - rows_data = _rows_from_json(response.get('rows', ()), self.schema) - return rows_data, total_rows, page_token +def _rows_page_start_query(iterator, page, response): + """Update query response when :class:`~google.cloud.iterator.Page` starts. + + .. note:: + + This assumes that the ``query_response`` attribute has been + added to the iterator after being created, which + should be done by the caller. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type page: :class:`~google.cloud.iterator.Page` + :param page: The page that was just created. + + :type response: dict + :param response: The JSON API response for a page of rows in a table. + """ + iterator.query_result._set_properties(response) + iterator.schema = iterator.query_result.schema + _rows_page_start(iterator, page, response) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 92ebfebb2d6e..662cc670d541 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -32,7 +32,8 @@ from google.cloud.streaming.transfer import RESUMABLE_UPLOAD from google.cloud.streaming.transfer import Upload from google.cloud.bigquery.schema import SchemaField -from google.cloud.bigquery._helpers import _row_from_json +from google.cloud.bigquery._helpers import _item_to_row +from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW @@ -1076,47 +1077,6 @@ def _build_schema_resource(fields): return infos -def _item_to_row(iterator, resource): - """Convert a JSON row to the native object. - - .. note:: - - This assumes that the ``schema`` attribute has been - added to the iterator after being created, which - should be done by the caller. - - :type iterator: :class:`~google.cloud.iterator.Iterator` - :param iterator: The iterator that is currently in use. - - :type resource: dict - :param resource: An item to be converted to a row. - - :rtype: tuple - :returns: The next row in the page. - """ - return _row_from_json(resource, iterator.schema) - - -# pylint: disable=unused-argument -def _rows_page_start(iterator, page, response): - """Grab total rows after a :class:`~google.cloud.iterator.Page` started. - - :type iterator: :class:`~google.cloud.iterator.Iterator` - :param iterator: The iterator that is currently in use. - - :type page: :class:`~google.cloud.iterator.Page` - :param page: The page that was just created. - - :type response: dict - :param response: The JSON API response for a page of rows in a table. - """ - total_rows = response.get('totalRows') - if total_rows is not None: - total_rows = int(total_rows) - iterator.total_rows = total_rows -# pylint: enable=unused-argument - - class _UploadConfig(object): """Faux message FBO apitools' 'configure_request'.""" accept = ['*/*'] diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 210951305b44..456953194a53 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -830,6 +830,23 @@ def test_dump_table_w_public_data(self): table.reload() self._fetch_single_page(table) + def test_large_query_w_public_data(self): + PUBLIC = 'bigquery-public-data' + DATASET_NAME = 'samples' + TABLE_NAME = 'natality' + LIMIT = 1000 + SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format( + PUBLIC, DATASET_NAME, TABLE_NAME, LIMIT) + + dataset = Config.CLIENT.dataset(DATASET_NAME, project=PUBLIC) + query = Config.CLIENT.run_sync_query(SQL) + query.use_legacy_sql = False + query.run() + + iterator = query.fetch_data() + rows = list(iterator) + self.assertEqual(len(rows), LIMIT) + def test_insert_nested_nested(self): # See #2951 SF = bigquery.SchemaField diff --git a/bigquery/tests/unit/test_query.py b/bigquery/tests/unit/test_query.py index c2b3ce5496e1..d7977a4e7d0c 100644 --- a/bigquery/tests/unit/test_query.py +++ b/bigquery/tests/unit/test_query.py @@ -654,6 +654,8 @@ def test_fetch_data_query_not_yet_run(self): self.assertRaises(ValueError, query.fetch_data) def test_fetch_data_w_bound_client(self): + import six + PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) BEFORE = self._makeResource(complete=False) AFTER = self._makeResource(complete=True) @@ -665,7 +667,11 @@ def test_fetch_data_w_bound_client(self): query._set_properties(BEFORE) self.assertFalse(query.complete) - rows, total_rows, page_token = query.fetch_data() + iterator = query.fetch_data() + page = six.next(iterator.pages) + rows = list(page) + total_rows = iterator.total_rows + page_token = iterator.next_page_token self.assertTrue(query.complete) self.assertEqual(len(rows), 4) @@ -682,6 +688,8 @@ def test_fetch_data_w_bound_client(self): self.assertEqual(req['path'], '/%s' % PATH) def test_fetch_data_w_alternate_client(self): + import six + PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) MAX = 10 TOKEN = 'TOKEN' @@ -698,9 +706,13 @@ def test_fetch_data_w_alternate_client(self): query._set_properties(BEFORE) self.assertFalse(query.complete) - rows, total_rows, page_token = query.fetch_data( + iterator = query.fetch_data( client=client2, max_results=MAX, page_token=TOKEN, start_index=START, timeout_ms=TIMEOUT) + page = six.next(iterator.pages) + rows = list(page) + total_rows = iterator.total_rows + page_token = iterator.next_page_token self.assertTrue(query.complete) self.assertEqual(len(rows), 4) diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index 6e395add09fc..8f630d772801 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -520,8 +520,8 @@ def client_run_sync_query_paged(client, _): all_rows = [] - def do_something_with(rows): - all_rows.extend(rows) + def do_something_with(row): + all_rows.append(row) # [START client_run_sync_query_paged] query = client.run_sync_query(LIMITED) @@ -534,18 +534,12 @@ def do_something_with(rows): assert len(query.rows) == PAGE_SIZE assert [field.name for field in query.schema] == ['name'] - rows = query.rows - token = query.page_token - - while True: - do_something_with(rows) - if token is None: - break - rows, total_count, token = query.fetch_data( - page_token=token) # API request + iterator = query.fetch_data() # API request(s) during iteration + for row in iterator: + do_something_with(row) # [END client_run_sync_query_paged] - assert total_count == LIMIT + assert iterator.total_rows == LIMIT assert len(all_rows) == LIMIT @@ -556,8 +550,8 @@ def client_run_sync_query_timeout(client, _): all_rows = [] - def do_something_with(rows): - all_rows.extend(rows) + def do_something_with(row): + all_rows.append(row) # [START client_run_sync_query_timeout] query = client.run_sync_query(QUERY) @@ -578,16 +572,12 @@ def do_something_with(rows): assert job.state == u'DONE' - rows, total_count, token = query.fetch_data() # API request - while True: - do_something_with(rows) - if token is None: - break - rows, total_count, token = query.fetch_data( - page_token=token) # API request + iterator = query.fetch_data() # API request(s) during iteration + for row in iterator: + do_something_with(row) # [END client_run_sync_query_timeout] - assert len(all_rows) == total_count + assert len(all_rows) == iterator.total_rows def _find_examples():