Skip to content

Commit

Permalink
moves to_dataframe() to RowIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
alixhami committed Nov 14, 2017
1 parent 51801f3 commit 1d58a73
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 99 deletions.
27 changes: 0 additions & 27 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

import six
from six.moves import http_client
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

import google.api_core.future.polling
from google.cloud import exceptions
Expand Down Expand Up @@ -1953,29 +1949,6 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
return self._client.list_rows(dest_table, selected_fields=schema,
retry=retry)

def to_dataframe(self):
"""Create a pandas DataFrame from the query results.
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
if pandas is None:
raise ValueError('The pandas library is not installed, please '
'install pandas to use the to_dataframe() '
'function.')

query_results = self.result()
column_headers = [field.name for field in query_results.schema]
rows = [row.values() for row in query_results]

return pandas.DataFrame(rows, columns=column_headers)

def __iter__(self):
return iter(self.result())

Expand Down
26 changes: 26 additions & 0 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import operator

import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.api_core.page_iterator import HTTPIterator

Expand Down Expand Up @@ -869,3 +873,25 @@ def total_rows(self):
int: the row count.
"""
return self._total_rows

def to_dataframe(self):
"""Create a pandas DataFrame from the query results.
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
if pandas is None:
raise ValueError('The pandas library is not installed, please '
'install pandas to use the to_dataframe() '
'function.')

column_headers = [field.name for field in self.schema]
rows = [row.values() for row in iter(self)]

return pandas.DataFrame(rows, columns=column_headers)
2 changes: 1 addition & 1 deletion bigquery/nox.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def system(session, py):
os.path.join('..', 'storage'),
os.path.join('..', 'test_utils'),
)
session.install('-e', '.')
session.install('-e', '.[pandas]')

# Run py.test against the system tests.
session.run(
Expand Down
20 changes: 20 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import uuid

import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.api_core.exceptions import PreconditionFailed
from google.cloud import bigquery
Expand Down Expand Up @@ -1242,6 +1246,22 @@ def test_query_iter(self):
row_tuples = [r.values() for r in query_job]
self.assertEqual(row_tuples, [(1,)])

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_query_results_to_dataframe(self):
PUBLIC = 'bigquery-public-data'
DATASET_ID = 'samples'
TABLE_NAME = 'natality'
LIMIT = 1000
SQL = 'SELECT year, weight_pounds from `{}.{}.{}` LIMIT {}'.format(
PUBLIC, DATASET_ID, TABLE_NAME, LIMIT)

df = Config.CLIENT.query(SQL).result().to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), LIMIT) # verify the number of rows
self.assertEqual(
list(df), ['year', 'weight_pounds']) # verify the column names

def test_query_table_def(self):
gs_url = self._write_csv_to_storage(
'bq_external_test' + unique_resource_id(), 'person_ages.csv',
Expand Down
71 changes: 0 additions & 71 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2724,77 +2724,6 @@ def test_reload_w_alternate_client(self):
self.assertEqual(req['path'], PATH)
self._verifyResourceProperties(job, RESOURCE)

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_to_dataframe(self):
begun_resource = self._make_resource()
query_resource = {
'jobComplete': True,
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_ID,
},
'schema': {
'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'},
],
},
'rows': [
{'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]},
{'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]},
{'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]},
{'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]},
],
}
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(
begun_resource, query_resource, done_resource, query_resource)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
df = job.to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ['name', 'age']) # verify the column names

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_to_dataframe_w_empty_results(self):
begun_resource = self._make_resource()
query_resource = {
'jobComplete': True,
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_ID,
},
'schema': {
'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'},
],
},
}
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(
begun_resource, query_resource, done_resource, query_resource)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
df = job.to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 0) # verify the number of rows
self.assertEqual(list(df), ['name', 'age']) # verify the column names

@mock.patch('google.cloud.bigquery.job.pandas', new=None)
def test_to_dataframe_error_if_pandas_is_none(self):
connection = _Connection({})
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

with self.assertRaises(ValueError):
job.to_dataframe()

def test_iter(self):
import types

Expand Down
145 changes: 145 additions & 0 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
import unittest

import mock
import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.cloud.bigquery.dataset import DatasetReference

Expand Down Expand Up @@ -773,3 +778,143 @@ def test_row(self):
row.z
with self.assertRaises(KeyError):
row['z']


class TestRowIterator(unittest.TestCase):

def test_constructor(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start

client = mock.sentinel.client
api_request = mock.sentinel.api_request
path = '/foo'
iterator = RowIterator(client, api_request, path)

self.assertFalse(iterator._started)
self.assertIs(iterator.client, client)
self.assertEqual(iterator.path, path)
self.assertIs(iterator._item_to_value, _item_to_row)
self.assertEqual(iterator._items_key, 'rows')
self.assertIsNone(iterator.max_results)
self.assertEqual(iterator.extra_params, {})
self.assertEqual(iterator._page_start, _rows_page_start)
# Changing attributes.
self.assertEqual(iterator.page_number, 0)
self.assertIsNone(iterator.next_page_token)
self.assertEqual(iterator.num_results, 0)

def test_iterate(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery._helpers import _field_to_index_mapping

schema = [
SchemaField('name', 'STRING', mode='REQUIRED'),
SchemaField('age', 'INTEGER', mode='REQUIRED')
]
rows = [
{'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]},
{'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]},
]
path = '/foo'
api_request = mock.Mock(return_value={'rows': rows})
row_iterator = RowIterator(
mock.sentinel.client, api_request, path=path)
row_iterator._schema = schema
row_iterator._field_to_index = _field_to_index_mapping(schema)
self.assertEqual(row_iterator.num_results, 0)

rows_iter = iter(row_iterator)

val1 = six.next(rows_iter)
print(val1)
self.assertEqual(val1.name, 'Phred Phlyntstone')
self.assertEqual(row_iterator.num_results, 1)

val2 = six.next(rows_iter)
self.assertEqual(val2.name, 'Bharney Rhubble')
self.assertEqual(row_iterator.num_results, 2)

with self.assertRaises(StopIteration):
six.next(rows_iter)

api_request.assert_called_once_with(
method='GET', path=path, query_params={})

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_to_dataframe(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery._helpers import _field_to_index_mapping

schema = [
SchemaField('name', 'STRING', mode='REQUIRED'),
SchemaField('age', 'INTEGER', mode='REQUIRED')
]
rows = [
{'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]},
{'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]},
{'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]},
{'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]},
]
path = '/foo'
api_request = mock.Mock(return_value={'rows': rows})
row_iterator = RowIterator(
mock.sentinel.client, api_request, path=path)
row_iterator._schema = schema
row_iterator._field_to_index = _field_to_index_mapping(schema)

df = row_iterator.to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ['name', 'age']) # verify the column names

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_to_dataframe_w_empty_results(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery._helpers import _field_to_index_mapping

schema = [
SchemaField('name', 'STRING', mode='REQUIRED'),
SchemaField('age', 'INTEGER', mode='REQUIRED')
]
path = '/foo'
api_request = mock.Mock(return_value={'rows': []})
row_iterator = RowIterator(
mock.sentinel.client, api_request, path=path)
row_iterator._schema = schema
row_iterator._field_to_index = _field_to_index_mapping(schema)

df = row_iterator.to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 0) # verify the number of rows
self.assertEqual(list(df), ['name', 'age']) # verify the column names

@mock.patch('google.cloud.bigquery.table.pandas', new=None)
def test_to_dataframe_error_if_pandas_is_none(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery._helpers import _field_to_index_mapping

schema = [
SchemaField('name', 'STRING', mode='REQUIRED'),
SchemaField('age', 'INTEGER', mode='REQUIRED')
]
rows = [
{'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]},
{'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]},
]
path = '/foo'
api_request = mock.Mock(return_value={'rows': rows})
row_iterator = RowIterator(
mock.sentinel.client, api_request, path=path)
row_iterator._schema = schema
row_iterator._field_to_index = _field_to_index_mapping(schema)

with self.assertRaises(ValueError):
row_iterator.to_dataframe()

0 comments on commit 1d58a73

Please sign in to comment.