Skip to content

Commit

Permalink
feat(bigquery): add create_bqstorage_client param to to_dataframe
Browse files Browse the repository at this point in the history
… and `to_arrow` (#9573)

* feat(bigquery): add `create_bqstorage_client` param to `to_dataframe` and `to_arrow`

When the `create_bqstorage_client` parameter is set to `True`, the
BigQuery client constructs a BigQuery Storage API client for you. This
removes the need for boilerplate code to manually construct both clients
explitly with the same credentials.

Does this make the `bqstorage_client` parameter unnecessary? In most
cases, yes, but there are a few cases where we'll want to continue using
it.

* When partner tools use `to_dataframe`, they should continue to use
  `bqstorage_client` so that they can set the correct amended user-agent
  strings.
* When a developer needs to override the default API endpoint for the BQ
  Storage API, they'll need to manually supply a `bqstorage_client`.

* test for BQ Storage API usage in samples tests.

* fix: close bqstorage client if created by to_dataframe/to_arrow

* chore: blacken

* doc: update versionadded

* doc: update versionadded
  • Loading branch information
tswast authored Dec 17, 2019
1 parent 5447d1c commit 217bd2b
Show file tree
Hide file tree
Showing 9 changed files with 431 additions and 37 deletions.
13 changes: 13 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,19 @@ def dataset(self, dataset_id, project=None):

return DatasetReference(project, dataset_id)

def _create_bqstorage_client(self):
"""Create a BigQuery Storage API client using this client's credentials.
Returns:
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
A BigQuery Storage API client.
"""
from google.cloud import bigquery_storage_v1beta1

return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=self._credentials
)

def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY):
"""API call: create the dataset via a POST request.
Expand Down
40 changes: 37 additions & 3 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3152,7 +3152,12 @@ def result(

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_arrow()
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
def to_arrow(
self,
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=False,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -3185,6 +3190,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
Reading from a specific partition or snapshot is not
currently supported by this method.
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
Returns:
pyarrow.Table
Expand All @@ -3199,12 +3214,20 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
..versionadded:: 1.17.0
"""
return self.result().to_arrow(
progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_dataframe()
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
create_bqstorage_client=False,
):
"""Return a pandas DataFrame from a QueryJob
Args:
Expand Down Expand Up @@ -3237,6 +3260,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
for details.
..versionadded:: 1.11.0
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -3250,6 +3283,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
)

def __iter__(self):
Expand Down
136 changes: 102 additions & 34 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,12 @@ def _to_arrow_iterable(self, bqstorage_client=None):

# If changing the signature of this method, make sure to apply the same
# changes to job.QueryJob.to_arrow()
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
def to_arrow(
self,
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=False,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -1489,6 +1494,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
Reading from a specific partition or snapshot is not
currently supported by this method.
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
Returns:
pyarrow.Table
Expand All @@ -1504,22 +1519,33 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

progress_bar = self._get_progress_bar(progress_bar_type)
owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

record_batches = []
for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client):
record_batches.append(record_batch)
try:
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(record_batch.num_rows)
record_batches = []
for record_batch in self._to_arrow_iterable(
bqstorage_client=bqstorage_client
):
record_batches.append(record_batch)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(record_batch.num_rows)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()

if record_batches:
return pyarrow.Table.from_batches(record_batches)
Expand Down Expand Up @@ -1558,14 +1584,20 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):

# If changing the signature of this method, make sure to apply the same
# changes to job.QueryJob.to_dataframe()
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
create_bqstorage_client=False,
):
"""Create a pandas DataFrame by loading all pages of a query.
Args:
bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient):
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.
from BigQuery.
This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.
Expand Down Expand Up @@ -1602,6 +1634,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
progress bar as a graphical dialog box.
..versionadded:: 1.11.0
create_bqstorage_client (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
Storage API client using the default API settings. The
BigQuery Storage API is a faster way to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.24.0
Returns:
pandas.DataFrame:
Expand All @@ -1621,32 +1663,44 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
if dtypes is None:
dtypes = {}

if bqstorage_client and self.max_results is not None:
if (
bqstorage_client or create_bqstorage_client
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
bqstorage_client = None

progress_bar = self._get_progress_bar(progress_bar_type)
owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

frames = []
for frame in self._to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
try:
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))
frames = []
for frame in self._to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()

# Avoid concatting an empty list.
if not frames:
Expand All @@ -1667,11 +1721,18 @@ class _EmptyRowIterator(object):
pages = ()
total_rows = 0

def to_arrow(self, progress_bar_type=None):
def to_arrow(
self,
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=False,
):
"""[Beta] Create an empty class:`pyarrow.Table`.
Args:
progress_bar_type (Optional[str]): Ignored. Added for compatibility with RowIterator.
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
Returns:
pyarrow.Table: An empty :class:`pyarrow.Table`.
Expand All @@ -1680,13 +1741,20 @@ def to_arrow(self, progress_bar_type=None):
raise ValueError(_NO_PYARROW_ERROR)
return pyarrow.Table.from_arrays(())

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
create_bqstorage_client=False,
):
"""Create an empty dataframe.
Args:
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
dtypes (Any): Ignored. Added for compatibility with RowIterator.
progress_bar_type (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
Returns:
pandas.DataFrame: An empty :class:`~pandas.DataFrame`.
Expand Down
33 changes: 33 additions & 0 deletions bigquery/samples/download_public_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_public_data(client):

# [START bigquery_pandas_public_data]
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# TODO(developer): Set table_id to the fully-qualified table ID in standard
# SQL format, including the project ID and dataset ID.
table_id = "bigquery-public-data.usa_names.usa_1910_current"

# Use the BigQuery Storage API to speed-up downloads of large tables.
dataframe = client.list_rows(table_id).to_dataframe(create_bqstorage_client=True)

print(dataframe.info())
# [END bigquery_pandas_public_data]
34 changes: 34 additions & 0 deletions bigquery/samples/download_public_data_sandbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_public_data_sandbox(client):

# [START bigquery_pandas_public_data_sandbox]
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# `SELECT *` is an anti-pattern in BigQuery because it is cheaper and
# faster to use the BigQuery Storage API directly, but BigQuery Sandbox
# users can only use the BigQuery Storage API to download query results.
query_string = "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`"

# Use the BigQuery Storage API to speed-up downloads of large tables.
dataframe = client.query(query_string).to_dataframe(create_bqstorage_client=True)

print(dataframe.info())
# [END bigquery_pandas_public_data_sandbox]
Loading

0 comments on commit 217bd2b

Please sign in to comment.