Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Get query results as pandas DataFrame #4354

Merged
merged 21 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0343a64
adds to_dataframe() to QueryJob
alixhami Nov 7, 2017
b74e6d4
removes unnecessary system test
alixhami Nov 7, 2017
e89b8de
adds docstring to to_dataframe()
alixhami Nov 7, 2017
8184716
updates to _make_resource() after rebasing for #4355
alixhami Nov 7, 2017
bc20f91
skips to_dataframe() tests if pandas is not installed
alixhami Nov 7, 2017
2b8ca85
imports pandas at module level and raises exception in to_dataframe()…
alixhami Nov 10, 2017
5c52dc6
adds pandas as extra for installation
alixhami Nov 10, 2017
484ab91
updates docstring to google style
alixhami Nov 10, 2017
4db3f4b
adds pandas extra to nox environment
alixhami Nov 10, 2017
a31e79d
adds 'no cover' pragma for pandas import errors
alixhami Nov 10, 2017
03b7fd5
adds test for when pandas is None
alixhami Nov 13, 2017
0c7bf88
fixes lint error
alixhami Nov 13, 2017
84994a7
adds RowIterator class
alixhami Nov 14, 2017
04f76f5
moves to_dataframe() to RowIterator
alixhami Nov 14, 2017
4fd0cc0
adds test for pandas handling of basic BigQuery data types
alixhami Nov 15, 2017
321b56a
moves schema to RowIterator constructor
alixhami Nov 15, 2017
da52040
adds tests for column dtypes
alixhami Nov 17, 2017
83d9e3c
adds test for query results to_dataframe() with nested schema
alixhami Nov 17, 2017
10fcd7c
updates system test for to_dataframe to check types
alixhami Nov 17, 2017
6762f95
adds to_dataframe() helper to QueryJob
alixhami Nov 18, 2017
0802ca8
updates pandas version to latest version that passes unit tests
alixhami Nov 22, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def _rows_page_start(iterator, page, response):
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
iterator.total_rows = total_rows
iterator._total_rows = total_rows
# pylint: enable=unused-argument


Expand Down
17 changes: 5 additions & 12 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@

from google.cloud.bigquery._helpers import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
from google.cloud.bigquery._helpers import _field_to_index_mapping
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _snake_to_camel_case
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -48,6 +45,7 @@
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableListItem
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA
from google.cloud.bigquery.table import _row_from_mapping

Expand Down Expand Up @@ -1180,7 +1178,7 @@ def list_rows(self, table, selected_fields=None, max_results=None,
:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the RPC.

:rtype: :class:`~google.api_core.page_iterator.Iterator`
:rtype: :class:`~google.cloud.bigquery.table.RowIterator`
:returns: Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
Expand Down Expand Up @@ -1208,20 +1206,15 @@ def list_rows(self, table, selected_fields=None, max_results=None,
if start_index is not None:
params['startIndex'] = start_index

iterator = page_iterator.HTTPIterator(
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry),
path='%s/data' % (table.path,),
item_to_value=_item_to_row,
items_key='rows',
schema=schema,
page_token=page_token,
next_token='pageToken',
max_results=max_results,
page_start=_rows_page_start,
extra_params=params)
iterator.schema = schema
iterator._field_to_index = _field_to_index_mapping(schema)
return iterator
return row_iterator

def list_partitions(self, table, retry=DEFAULT_RETRY):
"""List the partitions in a table.
Expand Down
15 changes: 14 additions & 1 deletion bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the call that retrieves rows.

:rtype: :class:`~google.api_core.page_iterator.Iterator`
:rtype: :class:`~google.cloud.bigquery.table.RowIterator`
:returns:
Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s.
During each page, the iterator will have the ``total_rows``
Expand All @@ -1949,6 +1949,19 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
return self._client.list_rows(dest_table, selected_fields=schema,
retry=retry)

def to_dataframe(self):

This comment was marked as spam.

This comment was marked as spam.

"""Return a pandas DataFrame from a QueryJob

Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.

Raises:
ValueError: If the `pandas` library cannot be imported.
"""
return self.result().to_dataframe()

def __iter__(self):
return iter(self.result())

Expand Down
78 changes: 78 additions & 0 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@
import operator

import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.api_core.page_iterator import HTTPIterator

from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _millis_from_datetime
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _snake_to_camel_case
from google.cloud.bigquery._helpers import _field_to_index_mapping
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
Expand Down Expand Up @@ -1023,3 +1032,72 @@ def __repr__(self):
key=operator.itemgetter(1))
f2i = '{' + ', '.join('%r: %d' % item for item in items) + '}'
return 'Row({}, {})'.format(self._xxx_values, f2i)


class RowIterator(HTTPIterator):
"""A class for iterating through HTTP/JSON API row list responses.

Args:
client (google.cloud.bigquery.Client): The API client.
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
The function to use to make API requests.
path (str): The method path to query for the list of items.
page_token (str): A token identifying a page in a result set to start
fetching results from.
max_results (int): The maximum number of results to fetch.
extra_params (dict): Extra query string parameters for the API call.

.. autoattribute:: pages
"""

def __init__(self, client, api_request, path, schema, page_token=None,
max_results=None, extra_params=None):
super(RowIterator, self).__init__(
client, api_request, path, item_to_value=_item_to_row,
items_key='rows', page_token=page_token, max_results=max_results,
extra_params=extra_params, page_start=_rows_page_start,
next_token='pageToken')
self._schema = schema
self._field_to_index = _field_to_index_mapping(schema)
self._total_rows = None

@property
def schema(self):
"""Schema for the table containing the rows

Returns:
list of :class:`~google.cloud.bigquery.schema.SchemaField`:
fields describing the schema
"""
return list(self._schema)

@property
def total_rows(self):
"""The total number of rows in the table.

Returns:
int: the row count.
"""
return self._total_rows

def to_dataframe(self):
"""Create a pandas DataFrame from the query results.

Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.

Raises:
ValueError: If the `pandas` library cannot be imported.

"""
if pandas is None:
raise ValueError('The pandas library is not installed, please '
'install pandas to use the to_dataframe() '
'function.')

column_headers = [field.name for field in self.schema]
rows = [row.values() for row in iter(self)]

return pandas.DataFrame(rows, columns=column_headers)
4 changes: 2 additions & 2 deletions bigquery/nox.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def default(session):
"""
# Install all test dependencies, then install this package in-place.
session.install('mock', 'pytest', 'pytest-cov', *LOCAL_DEPS)
session.install('-e', '.')
session.install('-e', '.[pandas]')

This comment was marked as spam.

This comment was marked as spam.


# Run py.test against the unit tests.
session.run(
Expand Down Expand Up @@ -89,7 +89,7 @@ def system(session, py):
os.path.join('..', 'storage'),
os.path.join('..', 'test_utils'),
)
session.install('-e', '.')
session.install('-e', '.[pandas]')

# Run py.test against the system tests.
session.run(
Expand Down
5 changes: 5 additions & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
'requests >= 2.18.0',
]

EXTRAS_REQUIREMENTS = {
'pandas': ['pandas >= 0.17.1'],
}

setup(
name='google-cloud-bigquery',
version='0.28.1.dev1',
Expand All @@ -69,5 +73,6 @@
],
packages=find_packages(exclude=('tests*',)),
install_requires=REQUIREMENTS,
extras_require=EXTRAS_REQUIREMENTS,
**SETUP_BASE
)
76 changes: 76 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import uuid

import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.api_core.exceptions import PreconditionFailed
from google.cloud import bigquery
Expand Down Expand Up @@ -1244,6 +1248,28 @@ def test_query_iter(self):
row_tuples = [r.values() for r in query_job]
self.assertEqual(row_tuples, [(1,)])

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_query_results_to_dataframe(self):
QUERY = """
SELECT id, author, time_ts, dead
from `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

df = Config.CLIENT.query(QUERY).result().to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ['id', 'author', 'time_ts', 'dead']
self.assertEqual(list(df), column_names) # verify the column names
exp_datatypes = {'id': int, 'author': str,
'time_ts': pandas.Timestamp, 'dead': bool}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

def test_query_table_def(self):
gs_url = self._write_csv_to_storage(
'bq_external_test' + unique_resource_id(), 'person_ages.csv',
Expand Down Expand Up @@ -1419,6 +1445,56 @@ def test_create_table_rows_fetch_nested_schema(self):
e_favtime = datetime.datetime(*parts[0:6])
self.assertEqual(found[7], e_favtime)

def _fetch_dataframe(self, query):
return Config.CLIENT.query(query).result().to_dataframe()

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_nested_table_to_dataframe(self):
SF = bigquery.SchemaField
schema = [
SF('string_col', 'STRING', mode='NULLABLE'),
SF('record_col', 'RECORD', mode='NULLABLE', fields=[
SF('nested_string', 'STRING', mode='NULLABLE'),
SF('nested_repeated', 'INTEGER', mode='REPEATED'),
SF('nested_record', 'RECORD', mode='NULLABLE', fields=[
SF('nested_nested_string', 'STRING', mode='NULLABLE'),
]),
]),
]
record = {
'nested_string': 'another string value',
'nested_repeated': [0, 1, 2],
'nested_record': {'nested_nested_string': 'some deep insight'},
}
to_insert = [
('Some value', record)
]
table_id = 'test_table'
dataset = self.temp_dataset(_make_dataset_id('nested_df'))
table_arg = Table(dataset.table(table_id), schema=schema)
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)
Config.CLIENT.create_rows(table, to_insert)
QUERY = 'SELECT * from `{}.{}.{}`'.format(
Config.CLIENT.project, dataset.dataset_id, table_id)

retry = RetryResult(_has_rows, max_tries=8)
df = retry(self._fetch_dataframe)(QUERY)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 1) # verify the number of rows
exp_columns = ['string_col', 'record_col']
self.assertEqual(list(df), exp_columns) # verify the column names
row = df.iloc[0]
# verify the row content
self.assertEqual(row['string_col'], 'Some value')
self.assertEqual(row['record_col'], record)
# verify that nested data can be accessed with indices/keys
self.assertEqual(row['record_col']['nested_repeated'][0], 0)
self.assertEqual(
row['record_col']['nested_record']['nested_nested_string'],
'some deep insight')

def temp_dataset(self, dataset_id):
dataset = retry_403(Config.CLIENT.create_dataset)(
Dataset(Config.CLIENT.dataset(dataset_id)))
Expand Down
39 changes: 39 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

from six.moves import http_client
import unittest
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig
from google.cloud.bigquery.job import LoadJobConfig
Expand Down Expand Up @@ -2720,6 +2724,41 @@ def test_reload_w_alternate_client(self):
self.assertEqual(req['path'], PATH)
self._verifyResourceProperties(job, RESOURCE)

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_to_dataframe(self):
begun_resource = self._make_resource()
query_resource = {
'jobComplete': True,
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_ID,
},
'schema': {
'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'},
],
},
'rows': [
{'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]},
{'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]},
{'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]},
{'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]},
],
}
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(
begun_resource, query_resource, done_resource, query_resource)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

df = job.to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ['name', 'age']) # verify the column names

def test_iter(self):
import types

Expand Down
Loading