diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 92af19c43ce5..742c1a3efad1 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -66,6 +66,7 @@ ) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' _MARKER = object() +_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds. def _reference_getter(table): @@ -1386,6 +1387,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): return pandas.concat(frames) + def _to_dataframe_bqstorage_stream( + self, bqstorage_client, dtypes, columns, session, stream + ): + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + rowstream = bqstorage_client.read_rows(position).rows(session) + + frames = [] + for page in rowstream.pages: + if self._to_dataframe_finished: + return + frames.append(page.to_dataframe(dtypes=dtypes)) + + # Avoid errors on unlucky streams with no blocks. pandas.concat + # will fail on an empty list. + if not frames: + return pandas.DataFrame(columns=columns) + + # page.to_dataframe() does not preserve column order. Rearrange at + # the end using manually-parsed schema. + return pandas.concat(frames)[columns] + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if bigquery_storage_v1beta1 is None: @@ -1421,17 +1443,46 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): if not session.streams: return pandas.DataFrame(columns=columns) - def get_dataframe(stream): - position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position) - return rowstream.to_dataframe(session, dtypes=dtypes) + # Use _to_dataframe_finished to notify worker threads when to quit. + # See: https://stackoverflow.com/a/29237343/101923 + self._to_dataframe_finished = False + + def get_frames(pool): + frames = [] + + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 + not_done = [ + pool.submit( + self._to_dataframe_bqstorage_stream, + bqstorage_client, + dtypes, + columns, + session, + stream, + ) + for stream in session.streams + ] + + while not_done: + done, not_done = concurrent.futures.wait( + not_done, timeout=_PROGRESS_INTERVAL + ) + frames.extend([future.result() for future in done]) + return frames with concurrent.futures.ThreadPoolExecutor() as pool: - frames = pool.map(get_dataframe, session.streams) + try: + frames = get_frames(pool) + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + self._to_dataframe_finished = True - # rowstream.to_dataframe() does not preserve column order. Rearrange at - # the end using manually-parsed schema. - return pandas.concat(frames)[columns] + return pandas.concat(frames) def _get_progress_bar(self, progress_bar_type): """Construct a tqdm progress bar object, if tqdm is installed.""" diff --git a/bigquery/setup.py b/bigquery/setup.py index 2c4d570d2c7e..b51fa63a9a75 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ ] extras = { "bqstorage": [ - "google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev", + "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", "fastavro>=0.21.2", ], "pandas": ["pandas>=0.17.1"], diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 7ac982394c9d..8bba2befccbc 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import itertools import json +import time import unittest import warnings @@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) - def test_to_dataframe_w_bqstorage_empty(self): + def test_to_dataframe_w_bqstorage_no_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut @@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) - def test_to_dataframe_w_bqstorage_nonempty(self): + def test_to_dataframe_w_bqstorage_empty_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}] + ) + session.avro_schema.schema = json.dumps( + { + "fields": [ + {"name": "colA"}, + # Not alphabetical to test column order. + {"name": "colC"}, + {"name": "colB"}, + ] + } + ) + bqstorage_client.create_read_session.return_value = session + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) - mock_rowstream.to_dataframe.return_value = pandas.DataFrame( - [ - {"colA": 1, "colB": "abc", "colC": 2.0}, - {"colA": -1, "colB": "def", "colC": 4.0}, - ] + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + mock_pages = mock.PropertyMock(return_value=()) + type(mock_rows).pages = mock_pages + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, ) + + got = row_iterator.to_dataframe(bqstorage_client) + + column_names = ["colA", "colC", "colB"] + self.assertEqual(list(got), column_names) + self.assertTrue(got.empty) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_nonempty(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self): } ) bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) + type(mock_rows).pages = mock_pages + schema = [ schema.SchemaField("colA", "IGNORED"), schema.SchemaField("colC", "IGNORED"), @@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self): selected_fields=schema, ) - got = row_iterator.to_dataframe(bqstorage_client) + with mock.patch( + "concurrent.futures.wait", wraps=concurrent.futures.wait + ) as mock_wait: + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + column_names = ["colA", "colC", "colB"] self.assertEqual(list(got), column_names) - self.assertEqual(len(got.index), 2) + self.assertEqual(len(got.index), 6) + # Make sure that this test looped through multiple progress intervals. + self.assertGreaterEqual(mock_wait.call_count, 2) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=[ + # Use two streams because one will fail with a + # KeyboardInterrupt, and we want to check that the other stream + # ends early. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + ) + session.avro_schema.schema = json.dumps( + {"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]} + ) + bqstorage_client.create_read_session.return_value = session + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) + type(mock_rows).pages = mock_pages + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + mock_rowstream.rows.return_value = mock_rows + + mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt) + type(mock_cancelled_rows).pages = mock_cancelled_pages + mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream) + mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows + + bqstorage_client.read_rows.side_effect = ( + mock_cancelled_rowstream, + mock_rowstream, + ) + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + with pytest.raises(KeyboardInterrupt): + row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + # Should not have fetched the third page of results because exit_early + # should have been set. + self.assertLessEqual(mock_page.to_dataframe.call_count, 2) @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(