Skip to content

Commit

Permalink
fix: to_dataframe respects progress_bar_type with BQ Storage API
Browse files Browse the repository at this point in the history
* Add unit test for progress bar.

* Add test for full queue.
  • Loading branch information
tswast committed Apr 22, 2019
1 parent ee804a1 commit b82eb6d
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 15 deletions.
57 changes: 53 additions & 4 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import warnings

import six
from six.moves import queue

try:
from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -66,7 +67,7 @@
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.
_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds.


def _reference_getter(table):
Expand Down Expand Up @@ -1274,6 +1275,16 @@ def __repr__(self):
return "Row({}, {})".format(self._xxx_values, f2i)


class _FakeQueue(object):
"""A fake Queue class that does nothing.
This is used when there is no progress bar to send updates to.
"""

def put_nowait(self, item):
"""Don't actually do anything with the item."""


class RowIterator(HTTPIterator):
"""A class for iterating through HTTP/JSON API row list responses.
Expand Down Expand Up @@ -1388,7 +1399,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
return pandas.concat(frames)

def _to_dataframe_bqstorage_stream(
self, bqstorage_client, dtypes, columns, session, stream
self, bqstorage_client, dtypes, columns, session, stream, progress_queue
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
Expand All @@ -1399,6 +1410,13 @@ def _to_dataframe_bqstorage_stream(
return
frames.append(page.to_dataframe(dtypes=dtypes))

try:
progress_queue.put_nowait(page.num_items)
except queue.Full:
# It's okay if we miss a few progress updates. Don't slow
# down parsing for that.
pass

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
Expand All @@ -1408,7 +1426,23 @@ def _to_dataframe_bqstorage_stream(
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
def _process_progress_updates(self, progress_queue, progress_bar):
if progress_bar is None:
return

# Output all updates since the last interval.
while True:
try:
next_update = progress_queue.get_nowait()
progress_bar.update(next_update)
except queue.Empty:
break

if self._to_dataframe_finished:
progress_bar.close()
return

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
if bigquery_storage_v1beta1 is None:
raise ValueError(_NO_BQSTORAGE_ERROR)
Expand Down Expand Up @@ -1447,6 +1481,11 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False

# Create a queue to track progress updates across threads.
progress_queue = _FakeQueue()
if progress_bar is not None:
progress_queue = queue.Queue()

def get_frames(pool):
frames = []

Expand All @@ -1462,6 +1501,7 @@ def get_frames(pool):
columns,
session,
stream,
progress_queue,
)
for stream in session.streams
]
Expand All @@ -1471,6 +1511,11 @@ def get_frames(pool):
not_done, timeout=_PROGRESS_INTERVAL
)
frames.extend([future.result() for future in done])

# The progress bar needs to update on the main thread to avoid
# contention over stdout / stderr.
self._process_progress_updates(progress_queue, progress_bar)

return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
Expand All @@ -1482,6 +1527,8 @@ def get_frames(pool):
# definition (enforced by the global interpreter lock).
self._to_dataframe_finished = True

# Update the progress bar one last time to close it.
self._process_progress_updates(progress_queue, progress_bar)
return pandas.concat(frames)

def _get_progress_bar(self, progress_bar_type):
Expand Down Expand Up @@ -1581,7 +1628,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non

if bqstorage_client is not None:
try:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
return self._to_dataframe_bqstorage(
bqstorage_client, dtypes, progress_bar=progress_bar
)
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
Expand Down
102 changes: 91 additions & 11 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import mock
import pytest
import six
from six.moves import queue

import google.api_core.exceptions

Expand Down Expand Up @@ -1815,9 +1816,12 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
)
streams = [
# Use two streams we want to check frames are read from each stream.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps(
{
"fields": [
Expand All @@ -1835,20 +1839,25 @@ def test_to_dataframe_w_bqstorage_nonempty(self):

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
page_items = [
{"colA": 1, "colB": "abc", "colC": 2.0},
{"colA": -1, "colB": "def", "colC": 4.0},
]

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)
return pandas.DataFrame(page_items, columns=["colA", "colB", "colC"])

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages
mock_pages = (mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

# Test that full queue errors are ignored.
mock_queue = mock.create_autospec(mut._FakeQueue)
mock_queue().put_nowait.side_effect = queue.Full

schema = [
schema.SchemaField("colA", "IGNORED"),
Expand All @@ -1865,17 +1874,88 @@ def blocking_to_dataframe(*args, **kwargs):
selected_fields=schema,
)

with mock.patch(
with mock.patch.object(mut, "_FakeQueue", mock_queue), mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

# Are the columns in the expected order?
column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 6)

# Have expected number of rows?
total_pages = len(streams) * len(mock_pages)
total_rows = len(page_items) * total_pages
self.assertEqual(len(got.index), total_rows)

# Make sure that this test looped through multiple progress intervals.
self.assertGreaterEqual(mock_wait.call_count, 2)

# Make sure that this test pushed to the progress queue.
self.assertEqual(mock_queue().put_nowait.call_count, total_pages)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@mock.patch("tqdm.tqdm")
def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
streams = [
# Use two streams we want to check that progress bar updates are
# sent from each stream.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]})
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows

mock_page = mock.create_autospec(reader.ReadRowsPage)
page_items = [-1, 0, 1]
type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items))
mock_page.to_dataframe.return_value = pandas.DataFrame({"testcol": page_items})
mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

schema = [schema.SchemaField("testcol", "IGNORED")]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

row_iterator.to_dataframe(
bqstorage_client=bqstorage_client, progress_bar_type="tqdm"
)

# Make sure that this test updated the progress bar once per page from
# each stream.
total_pages = len(streams) * len(mock_pages)
self.assertEqual(tqdm_mock().update.call_count, total_pages)
tqdm_mock().update.assert_called_with(len(page_items))
tqdm_mock().close.assert_called_once()

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
Expand Down

0 comments on commit b82eb6d

Please sign in to comment.