Skip to content

Commit

Permalink
Add worker queue for progress bar to prevent lost tqdm updates.
Browse files Browse the repository at this point in the history
The worker queue runs in a background thread, so it's more likely to be
able to keep up with the other workers that are adding to the worker
queue.
  • Loading branch information
tswast committed Apr 23, 2019
1 parent 6e3037c commit 71112b0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
54 changes: 49 additions & 5 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import datetime
import json
import operator
import threading
import time
import warnings

import six
Expand Down Expand Up @@ -69,6 +71,11 @@
_MARKER = object()
_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds.

# Send multiple updates from the worker threads, so there are at least a few
# waiting next time the prgrogess bar is updated.
_PROGRESS_UPDATES_PER_INTERVAL = 3
_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL


def _reference_getter(table):
"""A :class:`~google.cloud.bigquery.table.TableReference` pointing to
Expand Down Expand Up @@ -1275,7 +1282,7 @@ def __repr__(self):
return "Row({}, {})".format(self._xxx_values, f2i)


class _FakeQueue(object):
class _NoopProgressBarQueue(object):
"""A fake Queue class that does nothing.
This is used when there is no progress bar to send updates to.
Expand Down Expand Up @@ -1403,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
return pandas.concat(frames)

def _to_dataframe_bqstorage_stream(
self, bqstorage_client, dtypes, columns, session, stream, progress_queue
self, bqstorage_client, dtypes, columns, session, stream, worker_queue
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
Expand All @@ -1415,7 +1422,7 @@ def _to_dataframe_bqstorage_stream(
frames.append(page.to_dataframe(dtypes=dtypes))

try:
progress_queue.put_nowait(page.num_items)
worker_queue.put_nowait(page.num_items)
except queue.Full:
# It's okay if we miss a few progress updates. Don't slow
# down parsing for that.
Expand All @@ -1430,6 +1437,30 @@ def _to_dataframe_bqstorage_stream(
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def _process_worker_updates(self, worker_queue, progress_queue):
last_update_time = time.time()
current_update = 0

# Sum all updates in a contant loop.
while True:
try:
current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL)

# Time to send to the progress bar queue?
current_time = time.time()
elapsed_time = current_time - last_update_time
if elapsed_time > _PROGRESS_WORKER_INTERVAL:
progress_queue.put(current_update)
last_update_time = current_time
current_update = 0

except queue.Empty:
# Keep going, unless there probably aren't going to be any
# additional updates.
if self._to_dataframe_finished:
progress_queue.put(current_update)
return

def _process_progress_updates(self, progress_queue, progress_bar):
if progress_bar is None:
return
Expand Down Expand Up @@ -1486,9 +1517,16 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
self._to_dataframe_finished = False

# Create a queue to track progress updates across threads.
progress_queue = _FakeQueue()
worker_queue = _NoopProgressBarQueue()
progress_queue = None
progress_thread = None
if progress_bar is not None:
worker_queue = queue.Queue()
progress_queue = queue.Queue()
progress_thread = threading.Thread(
target=self._process_worker_updates, args=(worker_queue, progress_queue)
)
progress_thread.start()

def get_frames(pool):
frames = []
Expand All @@ -1505,7 +1543,7 @@ def get_frames(pool):
columns,
session,
stream,
progress_queue,
worker_queue,
)
for stream in session.streams
]
Expand All @@ -1531,6 +1569,12 @@ def get_frames(pool):
# definition (enforced by the global interpreter lock).
self._to_dataframe_finished = True

# Shutdown all background threads, now that they should know to
# exit early.
pool.shutdown(wait=True)
if progress_thread is not None:
progress_thread.join()

# Update the progress bar one last time to close it.
self._process_progress_updates(progress_queue, progress_bar)
return pandas.concat(frames)
Expand Down
11 changes: 7 additions & 4 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,7 @@ def blocking_to_dataframe(*args, **kwargs):
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

# Test that full queue errors are ignored.
mock_queue = mock.create_autospec(mut._FakeQueue)
mock_queue = mock.create_autospec(mut._NoopProgressBarQueue)
mock_queue().put_nowait.side_effect = queue.Full

schema = [
Expand All @@ -1875,7 +1875,7 @@ def blocking_to_dataframe(*args, **kwargs):
selected_fields=schema,
)

with mock.patch.object(mut, "_FakeQueue", mock_queue), mock.patch(
with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
Expand Down Expand Up @@ -1953,8 +1953,11 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
# Make sure that this test updated the progress bar once per page from
# each stream.
total_pages = len(streams) * len(mock_pages)
self.assertEqual(tqdm_mock().update.call_count, total_pages)
tqdm_mock().update.assert_called_with(len(page_items))
expected_total_rows = total_pages * len(page_items)
actual_total_rows = sum(
[args[0] for args, kwargs in tqdm_mock().update.call_args_list]
)
self.assertEqual(actual_total_rows, expected_total_rows)
tqdm_mock().close.assert_called_once()

@unittest.skipIf(pandas is None, "Requires `pandas`")
Expand Down

0 comments on commit 71112b0

Please sign in to comment.