From da0982da6712bc0b072b81eed5362b38f7198721 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Apr 2019 15:58:34 -0700 Subject: [PATCH 1/2] fix: `KeyboardInterrupt` during `to_dataframe` (with BQ Storage API) no longer hangs I noticed in manually testing `to_dataframe` that it would stop the current cell when I hit Ctrl-C, but data kept on downloading in the background. Trying to exit the Python shell, I'd notice that it would hang until I pressed Ctrl-C a few more times. Rather than get the DataFrame for each stream in one big chunk, loop through each block and exit if the function needs to quit early. This follows the pattern at https://stackoverflow.com/a/29237343/101923 Update tests to ensure multiple progress interval loops. --- bigquery/google/cloud/bigquery/table.py | 53 ++++++- bigquery/setup.py | 2 +- bigquery/tests/unit/test_table.py | 177 ++++++++++++++++++++++-- 3 files changed, 216 insertions(+), 16 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 92af19c43ce5..ec0ea813615c 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -66,6 +66,7 @@ ) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' _MARKER = object() +_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds. def _reference_getter(table): @@ -1421,16 +1422,56 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): if not session.streams: return pandas.DataFrame(columns=columns) + # Use finished to notify worker threads when to quit. See: + # https://stackoverflow.com/a/29237343/101923 + finished = False + def get_dataframe(stream): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position) - return rowstream.to_dataframe(session, dtypes=dtypes) + rowstream = bqstorage_client.read_rows(position).rows(session) + + frames = [] + for page in rowstream.pages: + if finished: + return + frames.append(page.to_dataframe(dtypes=dtypes)) + + # Avoid errors on unlucky streams with no blocks. pandas.concat + # will fail on an empty list. + if not frames: + return pandas.DataFrame(columns=columns) + + # page.to_dataframe() does not preserve column order. Rearrange at + # the end using manually-parsed schema. + return pandas.concat(frames)[columns] + + def get_frames(pool): + frames = [] + + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + not_done = [ + pool.submit(get_dataframe, stream) for stream in session.streams + ] + + while not_done: + done, not_done = concurrent.futures.wait( + not_done, timeout=_PROGRESS_INTERVAL + ) + frames.extend([future.result() for future in done]) + return frames with concurrent.futures.ThreadPoolExecutor() as pool: - frames = pool.map(get_dataframe, session.streams) - - # rowstream.to_dataframe() does not preserve column order. Rearrange at - # the end using manually-parsed schema. + try: + frames = get_frames(pool) + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + finished = True + + # Use [columns] to ensure column order matches manually-parsed schema. return pandas.concat(frames)[columns] def _get_progress_bar(self, progress_bar_type): diff --git a/bigquery/setup.py b/bigquery/setup.py index 2c4d570d2c7e..b51fa63a9a75 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ ] extras = { "bqstorage": [ - "google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev", + "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", "fastavro>=0.21.2", ], "pandas": ["pandas>=0.17.1"], diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 7ac982394c9d..8bba2befccbc 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import itertools import json +import time import unittest import warnings @@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) - def test_to_dataframe_w_bqstorage_empty(self): + def test_to_dataframe_w_bqstorage_no_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut @@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) - def test_to_dataframe_w_bqstorage_nonempty(self): + def test_to_dataframe_w_bqstorage_empty_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}] + ) + session.avro_schema.schema = json.dumps( + { + "fields": [ + {"name": "colA"}, + # Not alphabetical to test column order. + {"name": "colC"}, + {"name": "colB"}, + ] + } + ) + bqstorage_client.create_read_session.return_value = session + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) - mock_rowstream.to_dataframe.return_value = pandas.DataFrame( - [ - {"colA": 1, "colB": "abc", "colC": 2.0}, - {"colA": -1, "colB": "def", "colC": 4.0}, - ] + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + mock_pages = mock.PropertyMock(return_value=()) + type(mock_rows).pages = mock_pages + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, ) + + got = row_iterator.to_dataframe(bqstorage_client) + + column_names = ["colA", "colC", "colB"] + self.assertEqual(list(got), column_names) + self.assertTrue(got.empty) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_nonempty(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self): } ) bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) + type(mock_rows).pages = mock_pages + schema = [ schema.SchemaField("colA", "IGNORED"), schema.SchemaField("colC", "IGNORED"), @@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self): selected_fields=schema, ) - got = row_iterator.to_dataframe(bqstorage_client) + with mock.patch( + "concurrent.futures.wait", wraps=concurrent.futures.wait + ) as mock_wait: + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + column_names = ["colA", "colC", "colB"] self.assertEqual(list(got), column_names) - self.assertEqual(len(got.index), 2) + self.assertEqual(len(got.index), 6) + # Make sure that this test looped through multiple progress intervals. + self.assertGreaterEqual(mock_wait.call_count, 2) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=[ + # Use two streams because one will fail with a + # KeyboardInterrupt, and we want to check that the other stream + # ends early. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + ) + session.avro_schema.schema = json.dumps( + {"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]} + ) + bqstorage_client.create_read_session.return_value = session + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) + type(mock_rows).pages = mock_pages + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + mock_rowstream.rows.return_value = mock_rows + + mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt) + type(mock_cancelled_rows).pages = mock_cancelled_pages + mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream) + mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows + + bqstorage_client.read_rows.side_effect = ( + mock_cancelled_rowstream, + mock_rowstream, + ) + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + with pytest.raises(KeyboardInterrupt): + row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + # Should not have fetched the third page of results because exit_early + # should have been set. + self.assertLessEqual(mock_page.to_dataframe.call_count, 2) @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( From bf73284fe15c94c2ac6f922fef215f80a03ce44a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 17 Apr 2019 14:47:22 -0700 Subject: [PATCH 2/2] Refactor _to_dataframe_bqstorage_stream --- bigquery/google/cloud/bigquery/table.py | 62 ++++++++++++++----------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index ec0ea813615c..742c1a3efad1 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1387,6 +1387,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): return pandas.concat(frames) + def _to_dataframe_bqstorage_stream( + self, bqstorage_client, dtypes, columns, session, stream + ): + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + rowstream = bqstorage_client.read_rows(position).rows(session) + + frames = [] + for page in rowstream.pages: + if self._to_dataframe_finished: + return + frames.append(page.to_dataframe(dtypes=dtypes)) + + # Avoid errors on unlucky streams with no blocks. pandas.concat + # will fail on an empty list. + if not frames: + return pandas.DataFrame(columns=columns) + + # page.to_dataframe() does not preserve column order. Rearrange at + # the end using manually-parsed schema. + return pandas.concat(frames)[columns] + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if bigquery_storage_v1beta1 is None: @@ -1422,28 +1443,9 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): if not session.streams: return pandas.DataFrame(columns=columns) - # Use finished to notify worker threads when to quit. See: - # https://stackoverflow.com/a/29237343/101923 - finished = False - - def get_dataframe(stream): - position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position).rows(session) - - frames = [] - for page in rowstream.pages: - if finished: - return - frames.append(page.to_dataframe(dtypes=dtypes)) - - # Avoid errors on unlucky streams with no blocks. pandas.concat - # will fail on an empty list. - if not frames: - return pandas.DataFrame(columns=columns) - - # page.to_dataframe() does not preserve column order. Rearrange at - # the end using manually-parsed schema. - return pandas.concat(frames)[columns] + # Use _to_dataframe_finished to notify worker threads when to quit. + # See: https://stackoverflow.com/a/29237343/101923 + self._to_dataframe_finished = False def get_frames(pool): frames = [] @@ -1451,8 +1453,17 @@ def get_frames(pool): # Manually submit jobs and wait for download to complete rather # than using pool.map because pool.map continues running in the # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ - pool.submit(get_dataframe, stream) for stream in session.streams + pool.submit( + self._to_dataframe_bqstorage_stream, + bqstorage_client, + dtypes, + columns, + session, + stream, + ) + for stream in session.streams ] while not_done: @@ -1469,10 +1480,9 @@ def get_frames(pool): # No need for a lock because reading/replacing a variable is # defined to be an atomic operation in the Python language # definition (enforced by the global interpreter lock). - finished = True + self._to_dataframe_finished = True - # Use [columns] to ensure column order matches manually-parsed schema. - return pandas.concat(frames)[columns] + return pandas.concat(frames) def _get_progress_bar(self, progress_bar_type): """Construct a tqdm progress bar object, if tqdm is installed."""