From 0971781464280f9438c81ef34bd31a449895c8d1 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 12 Apr 2019 09:42:52 -0700 Subject: [PATCH] Use finally so that 'finished' is always set at end of looping. Update tests to ensure multiple progress interval loops. --- bigquery/google/cloud/bigquery/table.py | 11 +++++----- bigquery/tests/unit/test_table.py | 29 +++++++++++++++++++------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index cb338b4a0e75..a6de362b407c 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1403,9 +1403,9 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): if not session.streams: return pandas.DataFrame(columns=columns) - # Use exit_early to notify worker threads when to quit. See: + # Use finished to notify worker threads when to quit. See: # https://stackoverflow.com/a/29237343/101923 - exit_early = False + finished = False def get_dataframe(stream): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) @@ -1413,7 +1413,7 @@ def get_dataframe(stream): frames = [] for page in rowstream.pages: - if exit_early: + if finished: return frames.append(page.to_dataframe(dtypes=dtypes)) @@ -1446,12 +1446,11 @@ def get_frames(pool): with concurrent.futures.ThreadPoolExecutor() as pool: try: frames = get_frames(pool) - except: + finally: # No need for a lock because reading/replacing a variable is # defined to be an atomic operation in the Python language # definition (enforced by the global interpreter lock). - exit_early = True - raise + finished = True # Use [columns] to ensure column order matches manually-parsed schema. return pandas.concat(frames)[columns] diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 7755f06695f4..00fa968b4995 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import itertools import json import time @@ -1784,6 +1785,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self): from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -1808,12 +1812,18 @@ def test_to_dataframe_w_bqstorage_nonempty(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.return_value = pandas.DataFrame( - {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, - columns=["colA", "colB", "colC"], - ) - mock_pages = mock.PropertyMock(return_value=(mock_page,)) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) type(mock_rows).pages = mock_pages schema = [ @@ -1831,11 +1841,16 @@ def test_to_dataframe_w_bqstorage_nonempty(self): selected_fields=schema, ) - got = row_iterator.to_dataframe(bqstorage_client) + with mock.patch( + "concurrent.futures.wait", wraps=concurrent.futures.wait + ) as mock_wait: + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) column_names = ["colA", "colC", "colB"] self.assertEqual(list(got), column_names) - self.assertEqual(len(got.index), 2) + self.assertEqual(len(got.index), 6) + # Make sure that this test looped through multiple progress intervals. + self.assertGreaterEqual(mock_wait.call_count, 2) @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(