From 8593692b054670cb1af515e74cb77ab7d81ce718 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 17 Apr 2019 14:47:22 -0700 Subject: [PATCH] Refactor _to_dataframe_bqstorage_stream --- bigquery/google/cloud/bigquery/table.py | 62 ++++++++++++++----------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index d3e0f7d4c7bc..4584e964894e 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1368,6 +1368,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): return pandas.concat(frames) + def _to_dataframe_bqstorage_stream( + self, bqstorage_client, dtypes, columns, session, stream + ): + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + rowstream = bqstorage_client.read_rows(position).rows(session) + + frames = [] + for page in rowstream.pages: + if self._to_dataframe_finished: + return + frames.append(page.to_dataframe(dtypes=dtypes)) + + # Avoid errors on unlucky streams with no blocks. pandas.concat + # will fail on an empty list. + if not frames: + return pandas.DataFrame(columns=columns) + + # page.to_dataframe() does not preserve column order. Rearrange at + # the end using manually-parsed schema. + return pandas.concat(frames)[columns] + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" import concurrent.futures @@ -1403,28 +1424,9 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): if not session.streams: return pandas.DataFrame(columns=columns) - # Use finished to notify worker threads when to quit. See: - # https://stackoverflow.com/a/29237343/101923 - finished = False - - def get_dataframe(stream): - position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position).rows(session) - - frames = [] - for page in rowstream.pages: - if finished: - return - frames.append(page.to_dataframe(dtypes=dtypes)) - - # Avoid errors on unlucky streams with no blocks. pandas.concat - # will fail on an empty list. - if not frames: - return pandas.DataFrame(columns=columns) - - # page.to_dataframe() does not preserve column order. Rearrange at - # the end using manually-parsed schema. - return pandas.concat(frames)[columns] + # Use _to_dataframe_finished to notify worker threads when to quit. + # See: https://stackoverflow.com/a/29237343/101923 + self._to_dataframe_finished = False def get_frames(pool): frames = [] @@ -1432,8 +1434,17 @@ def get_frames(pool): # Manually submit jobs and wait for download to complete rather # than using pool.map because pool.map continues running in the # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ - pool.submit(get_dataframe, stream) for stream in session.streams + pool.submit( + self._to_dataframe_bqstorage_stream, + bqstorage_client, + dtypes, + columns, + session, + stream, + ) + for stream in session.streams ] while not_done: @@ -1450,10 +1461,9 @@ def get_frames(pool): # No need for a lock because reading/replacing a variable is # defined to be an atomic operation in the Python language # definition (enforced by the global interpreter lock). - finished = True + self._to_dataframe_finished = True - # Use [columns] to ensure column order matches manually-parsed schema. - return pandas.concat(frames)[columns] + return pandas.concat(frames) def _get_progress_bar(self, progress_bar_type): """Construct a tqdm progress bar object, if tqdm is installed."""