Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to BQ API when there are problems reading from BQ Storage. #7633

Merged
merged 1 commit into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
except ImportError: # pragma: NO COVER
tqdm = None

import google.api_core.exceptions
from google.api_core.page_iterator import HTTPIterator

import google.cloud._helpers
Expand Down Expand Up @@ -1437,7 +1438,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
**Alpha Feature** Optional. A BigQuery Storage API client. If
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.

Expand All @@ -1448,8 +1449,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
currently supported by this method.

**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.
query result tables with the BQ Storage API. When a problem
is encountered reading a table, the tabledata.list method
from the BigQuery API is used, instead.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Expand Down Expand Up @@ -1496,9 +1498,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
progress_bar = self._get_progress_bar(progress_bar_type)

if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
else:
return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  • Is the difference in progress_bar intentional?

  • Should we try to be more specific about what errors trigger fallback? I think the current exception is probably the right call, but do cases like non-enabled storage api fall under this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the difference in progress_bar intentional?

I haven't implemented the progress bar for BQ Storage API, yet. It'll need some additional features on the BQ Storage client to do well (expose to_dataframe() per block)

Should we try to be more specific about what errors trigger fallback?

I didn't want to be too specific, because the API currently returns "InternalError" when trying to read from small anonymous tables, which seemed like there's the potential to change in the future.

You're right that this does cach non-enabled storage API, which throws

 E   google.api_core.exceptions.PermissionDenied: 403 BigQuery Storage API has not been used in project before or it is disabled.

GoogleAPICallError is a superclass of PermissionDenied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've started work on the tutorial, I've had a change of heart regarding catching PermissionDenied and other authn/authz errors. #7661 We should rethrow those, but let the other (expected) errors get caught.

return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
except google.api_core.exceptions.GoogleAPICallError:
# There is a known issue with reading from small anonymous
# query results tables, so some errors are expected. Rather
# than throw those errors, try reading the DataFrame again, but
# with the tabledata.list API.
pass

return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)


class _EmptyRowIterator(object):
Expand Down
61 changes: 34 additions & 27 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,35 +1540,42 @@ def test_query_results_to_dataframe_w_bqstorage(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
df = (
Config.CLIENT.query(
query,
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
job_config=bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
)
.result()
.to_dataframe(bqstorage_client)

job_configs = (
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
# Check that the client is able to work around the issue with
# reading small anonymous query result tables by falling back to
# the tabledata.list API.
None,
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])
for job_config in job_configs:
df = (
Config.CLIENT.query(query, job_config=job_config)
.result()
.to_dataframe(bqstorage_client)
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

def test_insert_rows_nested_nested(self):
# See #2951
Expand Down
45 changes: 45 additions & 0 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import pytest
import six

import google.api_core.exceptions

try:
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

try:
import pandas
except (ImportError, AttributeError): # pragma: NO COVER
Expand Down Expand Up @@ -1748,6 +1751,48 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_fallback_to_tabledata_list(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
bqstorage_client.create_read_session.side_effect = google.api_core.exceptions.InternalServerError(
"can't read with bqstorage_client"
)
iterator_schema = [
schema.SchemaField("name", "STRING", mode="REQUIRED"),
schema.SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = mut.RowIterator(
_mock_client(),
api_request,
path,
iterator_schema,
table=mut.Table("proj.dset.tbl"),
)

df = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ["name", "age"]) # verify the column names
self.assertEqual(df.name.dtype.name, "object")
self.assertEqual(df.age.dtype.name, "int64")

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
Expand Down