Skip to content

Commit

Permalink
WIP: Add progress bar when BQ Storage API is used.
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed Apr 17, 2019
1 parent 6fc4af2 commit 9812563
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import datetime
import json
import operator
import queue
import warnings

import six
Expand Down Expand Up @@ -1255,6 +1256,16 @@ def __repr__(self):
return "Row({}, {})".format(self._xxx_values, f2i)


class _FakeQueue(object):
"""A fake Queue class that does nothing.
This is used when there is no progress bar to send updates to.
"""

def put_nowait(self, item):
"""Don't actually do anything with the item."""


class RowIterator(HTTPIterator):
"""A class for iterating through HTTP/JSON API row list responses.
Expand Down Expand Up @@ -1368,7 +1379,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):

return pandas.concat(frames)

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
import concurrent.futures
from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -1407,6 +1418,11 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
# https://stackoverflow.com/a/29237343/101923
finished = False

# Create a queue to track progress updates across threads.
progress_queue = _FakeQueue()
if progress_bar is not None:
progress_queue = queue.Queue()

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
Expand All @@ -1417,6 +1433,13 @@ def get_dataframe(stream):
return
frames.append(page.to_dataframe(dtypes=dtypes))

try:
progress_queue.put_nowait(page.num_items)
except queue.Full:
# It's okay if we miss a few progress updates. Don't slow
# down parsing for that.
pass

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
Expand All @@ -1441,9 +1464,31 @@ def get_frames(pool):
not_done, timeout=_PROGRESS_INTERVAL
)
frames.extend([future.result() for future in done])

# The progress bar needs to update on the main thread to avoid
# contention over stdout / stderr.
if progress_bar is not None:
update_progress_bar()

return frames

def update_progress_bar():
if finished:
progress_bar.close()
return

# Output all updates since the last interval.
while True:
try:
next_update = progress_queue.get_nowait()
progress_bar.update(next_update)
except queue.Empty:
return

with concurrent.futures.ThreadPoolExecutor() as pool:
if progress_bar is not None:
pool.submit(update_progress_bar)

try:
frames = get_frames(pool)
finally:
Expand Down Expand Up @@ -1549,7 +1594,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non

if bqstorage_client is not None:
try:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
return self._to_dataframe_bqstorage(bqstorage_client, dtypes, progress_bar=progress_bar)
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
Expand Down

0 comments on commit 9812563

Please sign in to comment.