Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to use BQ Storage API with to_dataframe #6854

Merged
merged 2 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,10 @@ def list_rows(
max_results=max_results,
page_size=page_size,
extra_params=params,
table=table,
# Pass in selected_fields separately from schema so that full
# tables can be fetched without a column filter.
selected_fields=selected_fields,
)
return row_iterator

Expand Down
102 changes: 96 additions & 6 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import copy
import datetime
import json
import operator
import warnings

Expand Down Expand Up @@ -1242,6 +1243,17 @@ class RowIterator(HTTPIterator):
page_size (int, optional): The number of items to return per page.
extra_params (Dict[str, object]):
Extra query string parameters for the API call.
table (Union[ \
:class:`~google.cloud.bigquery.table.Table`, \
:class:`~google.cloud.bigquery.table.TableReference`, \
]):
Optional. The table which these rows belong to, or a reference to
it. Used to call the BigQuery Storage API to fetch rows.
selected_fields (Sequence[ \
google.cloud.bigquery.schema.SchemaField, \
]):
Optional. A subset of columns to select from this table.

"""

def __init__(
Expand All @@ -1254,6 +1266,8 @@ def __init__(
max_results=None,
page_size=None,
extra_params=None,
table=None,
selected_fields=None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1271,6 +1285,9 @@ def __init__(
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._total_rows = None
self._page_size = page_size
self._table = table
self._selected_fields = selected_fields
self._project = client.project

def _get_next_page_response(self):
"""Requests the next page from the path provided.
Expand All @@ -1296,9 +1313,83 @@ def total_rows(self):
"""int: The total number of rows in the table."""
return self._total_rows

def to_dataframe(self):
def _to_dataframe_tabledata_list(self):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_headers = [field.name for field in self.schema]
# Use generator, rather than pulling the whole rowset into memory.
rows = (row.values() for row in iter(self))
return pandas.DataFrame(rows, columns=column_headers)

def _to_dataframe_bqstorage(self, bqstorage_client):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
import concurrent.futures
from google.cloud import bigquery_storage_v1beta1

if "$" in self._table.table_id:
raise ValueError(
"Reading from a specific partition is not currently supported."
)
if "@" in self._table.table_id:
raise ValueError(
"Reading from a specific snapshot is not currently supported."
)

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
if self._selected_fields is not None:
for field in self._selected_fields:
read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
self._table.to_bqstorage(),
"projects/{}".format(self._project),
read_options=read_options,
)

# We need to parse the schema manually so that we can rearrange the
# columns.
schema = json.loads(session.avro_schema.schema)
columns = [field["name"] for field in schema["fields"]]

# Avoid reading rows from an empty table. pandas.concat will fail on an
# empty list.
if not session.streams:
return pandas.DataFrame(columns=columns)

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position)
return rowstream.to_dataframe(session)

with concurrent.futures.ThreadPoolExecutor(
thread_name_prefix="to_dataframe_"
) as pool:
frames = pool.map(get_dataframe, session.streams)

# rowstream.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def to_dataframe(self, bqstorage_client=None):
"""Create a pandas DataFrame from the query results.

Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
Optional. A BigQuery Storage API client. If supplied, use the
faster BigQuery Storage API to fetch rows from BigQuery. This
API is a billable API.

This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.

Reading from a specific partition or snapshot is not
currently supported by this method.

**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.

Returns:
pandas.DataFrame:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -1312,11 +1403,10 @@ def to_dataframe(self):
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)

column_headers = [field.name for field in self.schema]
# Use generator, rather than pulling the whole rowset into memory.
rows = (row.values() for row in iter(self))

return pandas.DataFrame(rows, columns=column_headers)
if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client)
else:
return self._to_dataframe_tabledata_list()


class _EmptyRowIterator(object):
Expand Down
53 changes: 52 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import six
import pytest

try:
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -1496,7 +1500,7 @@ def test_query_iter(self):
def test_query_results_to_dataframe(self):
QUERY = """
SELECT id, author, time_ts, dead
from `bigquery-public-data.hacker_news.comments`
FROM `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

Expand All @@ -1518,6 +1522,53 @@ def test_query_results_to_dataframe(self):
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_query_results_to_dataframe_w_bqstorage(self):
dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_"))
dest_ref = dest_dataset.table("query_results")

query = """
SELECT id, author, time_ts, dead
FROM `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
df = (
Config.CLIENT.query(
query,
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
job_config=bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
)
.result()
.to_dataframe(bqstorage_client)
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

def test_insert_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
Loading