Skip to content

Commit

Permalink
BigQuery: ensure that KeyboardInterrupt during to_dataframeno lon…
Browse files Browse the repository at this point in the history
…ger hangs. (#7698)

* fix: `KeyboardInterrupt` during `to_dataframe` (with BQ Storage API) no longer hangs

I noticed in manually testing `to_dataframe` that it would stop
the current cell when I hit Ctrl-C, but data kept on downloading in the
background. Trying to exit the Python shell, I'd notice that it would
hang until I pressed Ctrl-C a few more times.

Rather than get the DataFrame for each stream in one big chunk, loop
through each block and exit if the function needs to quit early. This
follows the pattern at https://stackoverflow.com/a/29237343/101923

Update tests to ensure multiple progress interval loops.

* Refactor _to_dataframe_bqstorage_stream
  • Loading branch information
tswast authored Apr 22, 2019
1 parent f540d42 commit ee804a1
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 18 deletions.
67 changes: 59 additions & 8 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.


def _reference_getter(table):
Expand Down Expand Up @@ -1386,6 +1387,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):

return pandas.concat(frames)

def _to_dataframe_bqstorage_stream(
self, bqstorage_client, dtypes, columns, session, stream
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)

frames = []
for page in rowstream.pages:
if self._to_dataframe_finished:
return
frames.append(page.to_dataframe(dtypes=dtypes))

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
return pandas.DataFrame(columns=columns)

# page.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
if bigquery_storage_v1beta1 is None:
Expand Down Expand Up @@ -1421,17 +1443,46 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
if not session.streams:
return pandas.DataFrame(columns=columns)

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position)
return rowstream.to_dataframe(session, dtypes=dtypes)
# Use _to_dataframe_finished to notify worker threads when to quit.
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False

def get_frames(pool):
frames = []

# Manually submit jobs and wait for download to complete rather
# than using pool.map because pool.map continues running in the
# background even if there is an exception on the main thread.
# See: https://github.com/googleapis/google-cloud-python/pull/7698
not_done = [
pool.submit(
self._to_dataframe_bqstorage_stream,
bqstorage_client,
dtypes,
columns,
session,
stream,
)
for stream in session.streams
]

while not_done:
done, not_done = concurrent.futures.wait(
not_done, timeout=_PROGRESS_INTERVAL
)
frames.extend([future.result() for future in done])
return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
frames = pool.map(get_dataframe, session.streams)
try:
frames = get_frames(pool)
finally:
# No need for a lock because reading/replacing a variable is
# defined to be an atomic operation in the Python language
# definition (enforced by the global interpreter lock).
self._to_dataframe_finished = True

# rowstream.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]
return pandas.concat(frames)

def _get_progress_bar(self, progress_bar_type):
"""Construct a tqdm progress bar object, if tqdm is installed."""
Expand Down
2 changes: 1 addition & 1 deletion bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
]
extras = {
"bqstorage": [
"google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev",
"google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev",
"fastavro>=0.21.2",
],
"pandas": ["pandas>=0.17.1"],
Expand Down
177 changes: 168 additions & 9 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import itertools
import json
import time
import unittest
import warnings

Expand Down Expand Up @@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_empty(self):
def test_to_dataframe_w_bqstorage_no_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut

Expand Down Expand Up @@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_nonempty(self):
def test_to_dataframe_w_bqstorage_empty_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
)
session.avro_schema.schema = json.dumps(
{
"fields": [
{"name": "colA"},
# Not alphabetical to test column order.
{"name": "colC"},
{"name": "colB"},
]
}
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_rowstream.to_dataframe.return_value = pandas.DataFrame(
[
{"colA": 1, "colB": "abc", "colC": 2.0},
{"colA": -1, "colB": "def", "colC": 4.0},
]
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
mock_pages = mock.PropertyMock(return_value=())
type(mock_rows).pages = mock_pages

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertTrue(got.empty)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_nonempty(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
}
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
Expand All @@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)
with mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 2)
self.assertEqual(len(got.index), 6)
# Make sure that this test looped through multiple progress intervals.
self.assertGreaterEqual(mock_wait.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[
# Use two streams because one will fail with a
# KeyboardInterrupt, and we want to check that the other stream
# ends early.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
)
session.avro_schema.schema = json.dumps(
{"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]}
)
bqstorage_client.create_read_session.return_value = session

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_rowstream.rows.return_value = mock_rows

mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt)
type(mock_cancelled_rows).pages = mock_cancelled_pages
mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows

bqstorage_client.read_rows.side_effect = (
mock_cancelled_rowstream,
mock_rowstream,
)

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

with pytest.raises(KeyboardInterrupt):
row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

# Should not have fetched the third page of results because exit_early
# should have been set.
self.assertLessEqual(mock_page.to_dataframe.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
Expand Down

0 comments on commit ee804a1

Please sign in to comment.