Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make 'QueryResponse.fetch_data' return an iterator. #3484

Merged
merged 1 commit into from
Jun 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,44 @@ def __set__(self, instance, value):
raise ValueError(
"query parameters must be derived from AbstractQueryParameter")
instance._query_parameters = tuple(value)


def _item_to_row(iterator, resource):
"""Convert a JSON row to the native object.

.. note::

This assumes that the ``schema`` attribute has been
added to the iterator after being created, which
should be done by the caller.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type resource: dict
:param resource: An item to be converted to a row.

:rtype: tuple
:returns: The next row in the page.
"""
return _row_from_json(resource, iterator.schema)


# pylint: disable=unused-argument
def _rows_page_start(iterator, page, response):
"""Grab total rows when :class:`~google.cloud.iterator.Page` starts.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type page: :class:`~google.cloud.iterator.Page`
:param page: The page that was just created.

:type response: dict
:param response: The JSON API response for a page of rows in a table.
"""
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
iterator.total_rows = total_rows
# pylint: enable=unused-argument
51 changes: 35 additions & 16 deletions bigquery/google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

import six

from google.cloud.iterator import HTTPIterator
from google.cloud.bigquery._helpers import _TypedProperty
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import QueryParametersProperty
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start


class _SyncQueryConfiguration(object):
Expand Down Expand Up @@ -426,28 +429,44 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
client = self._require_client(client)
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

if start_index is not None:
params['startIndex'] = start_index

if timeout_ms is not None:
params['timeoutMs'] = timeout_ms

path = '/projects/%s/queries/%s' % (self.project, self.name)
response = client._connection.api_request(method='GET',
path=path,
query_params=params)
self._set_properties(response)
iterator = HTTPIterator(client=client, path=path,
item_to_value=_item_to_row,
items_key='rows',
page_token=page_token,
max_results=max_results,
page_start=_rows_page_start_query,
extra_params=params)
iterator.query_result = self
# Over-ride the key used to retrieve the next page token.
iterator._NEXT_TOKEN = 'pageToken'
return iterator

total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
page_token = response.get('pageToken')
rows_data = _rows_from_json(response.get('rows', ()), self.schema)

return rows_data, total_rows, page_token
def _rows_page_start_query(iterator, page, response):
"""Update query response when :class:`~google.cloud.iterator.Page` starts.

.. note::

This assumes that the ``query_response`` attribute has been
added to the iterator after being created, which
should be done by the caller.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type page: :class:`~google.cloud.iterator.Page`
:param page: The page that was just created.

:type response: dict
:param response: The JSON API response for a page of rows in a table.
"""
iterator.query_result._set_properties(response)

This comment was marked as spam.

iterator.schema = iterator.query_result.schema
_rows_page_start(iterator, page, response)
44 changes: 2 additions & 42 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
from google.cloud.streaming.transfer import Upload
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _row_from_json
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


Expand Down Expand Up @@ -1076,47 +1077,6 @@ def _build_schema_resource(fields):
return infos


def _item_to_row(iterator, resource):
"""Convert a JSON row to the native object.

.. note::

This assumes that the ``schema`` attribute has been
added to the iterator after being created, which
should be done by the caller.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type resource: dict
:param resource: An item to be converted to a row.

:rtype: tuple
:returns: The next row in the page.
"""
return _row_from_json(resource, iterator.schema)


# pylint: disable=unused-argument
def _rows_page_start(iterator, page, response):
"""Grab total rows after a :class:`~google.cloud.iterator.Page` started.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type page: :class:`~google.cloud.iterator.Page`
:param page: The page that was just created.

:type response: dict
:param response: The JSON API response for a page of rows in a table.
"""
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
iterator.total_rows = total_rows
# pylint: enable=unused-argument


class _UploadConfig(object):
"""Faux message FBO apitools' 'configure_request'."""
accept = ['*/*']
Expand Down
17 changes: 17 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,23 @@ def test_dump_table_w_public_data(self):
table.reload()
self._fetch_single_page(table)

def test_large_query_w_public_data(self):
PUBLIC = 'bigquery-public-data'
DATASET_NAME = 'samples'
TABLE_NAME = 'natality'
LIMIT = 1000
SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format(
PUBLIC, DATASET_NAME, TABLE_NAME, LIMIT)

dataset = Config.CLIENT.dataset(DATASET_NAME, project=PUBLIC)
query = Config.CLIENT.run_sync_query(SQL)
query.use_legacy_sql = False
query.run()

iterator = query.fetch_data()
rows = list(iterator)
self.assertEqual(len(rows), LIMIT)

def test_insert_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
16 changes: 14 additions & 2 deletions bigquery/tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ def test_fetch_data_query_not_yet_run(self):
self.assertRaises(ValueError, query.fetch_data)

def test_fetch_data_w_bound_client(self):
import six

PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME)
BEFORE = self._makeResource(complete=False)
AFTER = self._makeResource(complete=True)
Expand All @@ -665,7 +667,11 @@ def test_fetch_data_w_bound_client(self):
query._set_properties(BEFORE)
self.assertFalse(query.complete)

rows, total_rows, page_token = query.fetch_data()
iterator = query.fetch_data()
page = six.next(iterator.pages)
rows = list(page)
total_rows = iterator.total_rows
page_token = iterator.next_page_token

self.assertTrue(query.complete)
self.assertEqual(len(rows), 4)
Expand All @@ -682,6 +688,8 @@ def test_fetch_data_w_bound_client(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_fetch_data_w_alternate_client(self):
import six

PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME)
MAX = 10
TOKEN = 'TOKEN'
Expand All @@ -698,9 +706,13 @@ def test_fetch_data_w_alternate_client(self):
query._set_properties(BEFORE)
self.assertFalse(query.complete)

rows, total_rows, page_token = query.fetch_data(
iterator = query.fetch_data(
client=client2, max_results=MAX, page_token=TOKEN,
start_index=START, timeout_ms=TIMEOUT)
page = six.next(iterator.pages)
rows = list(page)
total_rows = iterator.total_rows
page_token = iterator.next_page_token

self.assertTrue(query.complete)
self.assertEqual(len(rows), 4)
Expand Down
34 changes: 12 additions & 22 deletions docs/bigquery/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ def client_run_sync_query_paged(client, _):

all_rows = []

def do_something_with(rows):
all_rows.extend(rows)
def do_something_with(row):
all_rows.append(row)

# [START client_run_sync_query_paged]
query = client.run_sync_query(LIMITED)
Expand All @@ -534,18 +534,12 @@ def do_something_with(rows):
assert len(query.rows) == PAGE_SIZE
assert [field.name for field in query.schema] == ['name']

rows = query.rows
token = query.page_token

while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
iterator = query.fetch_data() # API request(s) during iteration
for row in iterator:
do_something_with(row)
# [END client_run_sync_query_paged]

assert total_count == LIMIT
assert iterator.total_rows == LIMIT
assert len(all_rows) == LIMIT


Expand All @@ -556,8 +550,8 @@ def client_run_sync_query_timeout(client, _):

all_rows = []

def do_something_with(rows):
all_rows.extend(rows)
def do_something_with(row):
all_rows.append(row)

# [START client_run_sync_query_timeout]
query = client.run_sync_query(QUERY)
Expand All @@ -578,16 +572,12 @@ def do_something_with(rows):

assert job.state == u'DONE'

rows, total_count, token = query.fetch_data() # API request
while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
iterator = query.fetch_data() # API request(s) during iteration
for row in iterator:
do_something_with(row)
# [END client_run_sync_query_timeout]

assert len(all_rows) == total_count
assert len(all_rows) == iterator.total_rows


def _find_examples():
Expand Down