Skip to content

Commit

Permalink
Add option to use BQ Storage API with to_dataframe (#6854)
Browse files Browse the repository at this point in the history
* Add option to use BQ Storage API with to_dataframe

This is a faster method to read a dataframe from a table using the
(alpha) BigQuery Storage API. Supply a BQ Storage API to to_dataframe()
to use the faster method.

Currently it cannot read data from (small) anonymous query results
tables, thus why the system test has a destination table for the query.

* Remove thread prefix (not present in Python 3.5)
  • Loading branch information
tswast authored Dec 6, 2018
1 parent a48f30d commit 8b59a92
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 15 deletions.
4 changes: 4 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,10 @@ def list_rows(
max_results=max_results,
page_size=page_size,
extra_params=params,
table=table,
# Pass in selected_fields separately from schema so that full
# tables can be fetched without a column filter.
selected_fields=selected_fields,
)
return row_iterator

Expand Down
100 changes: 94 additions & 6 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import copy
import datetime
import json
import operator
import warnings

Expand Down Expand Up @@ -1242,6 +1243,17 @@ class RowIterator(HTTPIterator):
page_size (int, optional): The number of items to return per page.
extra_params (Dict[str, object]):
Extra query string parameters for the API call.
table (Union[ \
:class:`~google.cloud.bigquery.table.Table`, \
:class:`~google.cloud.bigquery.table.TableReference`, \
]):
Optional. The table which these rows belong to, or a reference to
it. Used to call the BigQuery Storage API to fetch rows.
selected_fields (Sequence[ \
google.cloud.bigquery.schema.SchemaField, \
]):
Optional. A subset of columns to select from this table.
"""

def __init__(
Expand All @@ -1254,6 +1266,8 @@ def __init__(
max_results=None,
page_size=None,
extra_params=None,
table=None,
selected_fields=None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1271,6 +1285,9 @@ def __init__(
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._total_rows = None
self._page_size = page_size
self._table = table
self._selected_fields = selected_fields
self._project = client.project

def _get_next_page_response(self):
"""Requests the next page from the path provided.
Expand All @@ -1296,9 +1313,81 @@ def total_rows(self):
"""int: The total number of rows in the table."""
return self._total_rows

def to_dataframe(self):
def _to_dataframe_tabledata_list(self):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_headers = [field.name for field in self.schema]
# Use generator, rather than pulling the whole rowset into memory.
rows = (row.values() for row in iter(self))
return pandas.DataFrame(rows, columns=column_headers)

def _to_dataframe_bqstorage(self, bqstorage_client):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
import concurrent.futures
from google.cloud import bigquery_storage_v1beta1

if "$" in self._table.table_id:
raise ValueError(
"Reading from a specific partition is not currently supported."
)
if "@" in self._table.table_id:
raise ValueError(
"Reading from a specific snapshot is not currently supported."
)

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
if self._selected_fields is not None:
for field in self._selected_fields:
read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
self._table.to_bqstorage(),
"projects/{}".format(self._project),
read_options=read_options,
)

# We need to parse the schema manually so that we can rearrange the
# columns.
schema = json.loads(session.avro_schema.schema)
columns = [field["name"] for field in schema["fields"]]

# Avoid reading rows from an empty table. pandas.concat will fail on an
# empty list.
if not session.streams:
return pandas.DataFrame(columns=columns)

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position)
return rowstream.to_dataframe(session)

with concurrent.futures.ThreadPoolExecutor() as pool:
frames = pool.map(get_dataframe, session.streams)

# rowstream.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def to_dataframe(self, bqstorage_client=None):
"""Create a pandas DataFrame from the query results.
Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
Optional. A BigQuery Storage API client. If supplied, use the
faster BigQuery Storage API to fetch rows from BigQuery. This
API is a billable API.
This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.
Reading from a specific partition or snapshot is not
currently supported by this method.
**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.
Returns:
pandas.DataFrame:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -1312,11 +1401,10 @@ def to_dataframe(self):
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)

column_headers = [field.name for field in self.schema]
# Use generator, rather than pulling the whole rowset into memory.
rows = (row.values() for row in iter(self))

return pandas.DataFrame(rows, columns=column_headers)
if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client)
else:
return self._to_dataframe_tabledata_list()


class _EmptyRowIterator(object):
Expand Down
53 changes: 52 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import six
import pytest

try:
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -1496,7 +1500,7 @@ def test_query_iter(self):
def test_query_results_to_dataframe(self):
QUERY = """
SELECT id, author, time_ts, dead
from `bigquery-public-data.hacker_news.comments`
FROM `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

Expand All @@ -1518,6 +1522,53 @@ def test_query_results_to_dataframe(self):
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_query_results_to_dataframe_w_bqstorage(self):
dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_"))
dest_ref = dest_dataset.table("query_results")

query = """
SELECT id, author, time_ts, dead
FROM `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
df = (
Config.CLIENT.query(
query,
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
job_config=bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
)
.result()
.to_dataframe(bqstorage_client)
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

def test_insert_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
Loading

0 comments on commit 8b59a92

Please sign in to comment.