Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Marshal row data correctly in 'Table.insert_data()' #3426

Merged
merged 4 commits into from
May 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 57 additions & 35 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud._helpers import UTC
from google.cloud._helpers import _date_from_iso8601_date
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _time_from_iso8601_time_naive
from google.cloud._helpers import _to_bytes
Expand Down Expand Up @@ -122,6 +123,38 @@ def _record_from_json(value, field):
}


def _row_from_json(row, schema):
"""Convert JSON row data to row with appropriate types.

Note: ``row['f']`` and ``schema`` are presumed to be of the same length.

:type row: dict
:param row: A JSON response row to be converted.

:type schema: tuple
:param schema: A tuple of
:class:`~google.cloud.bigquery.schema.SchemaField`.

:rtype: tuple
:returns: A tuple of data converted to native types.
"""
row_data = []
for field, cell in zip(schema, row['f']):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item['v'], field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))

return tuple(row_data)


def _rows_from_json(rows, schema):
"""Convert JSON row data to rows with appropriate types."""
return [_row_from_json(row, schema) for row in rows]


def _int_to_json(value):
"""Coerce 'value' to an JSON-compatible representation."""
if isinstance(value, int):
Expand All @@ -148,8 +181,11 @@ def _bytes_to_json(value):
return value


def _timestamp_to_json(value):
"""Coerce 'value' to an JSON-compatible representation."""
def _timestamp_to_json_parameter(value):
"""Coerce 'value' to an JSON-compatible representation.

This version returns the string representation used in query parameters.
"""
if isinstance(value, datetime.datetime):
if value.tzinfo not in (None, UTC):
# Convert to UTC and remove the time zone info.
Expand All @@ -159,6 +195,16 @@ def _timestamp_to_json(value):
return value


def _timestamp_to_json_row(value):
"""Coerce 'value' to an JSON-compatible representation.

This version returns floating-point seconds value used in row data.
"""
if isinstance(value, datetime.datetime):
value = _microseconds_from_datetime(value) * 1e-6
return value


def _datetime_to_json(value):
"""Coerce 'value' to an JSON-compatible representation."""
if isinstance(value, datetime.datetime):
Expand All @@ -180,49 +226,25 @@ def _time_to_json(value):
return value


_SCALAR_VALUE_TO_JSON = {
# Converters used for scalar values marshalled as row data.
_SCALAR_VALUE_TO_JSON_ROW = {
'INTEGER': _int_to_json,
'INT64': _int_to_json,
'FLOAT': _float_to_json,
'FLOAT64': _float_to_json,
'BOOLEAN': _bool_to_json,
'BOOL': _bool_to_json,
'BYTES': _bytes_to_json,
'TIMESTAMP': _timestamp_to_json,
'TIMESTAMP': _timestamp_to_json_row,
'DATETIME': _datetime_to_json,
'DATE': _date_to_json,
'TIME': _time_to_json,
}


def _row_from_json(row, schema):
"""Convert JSON row data to row with appropriate types.

:type row: dict
:param row: A JSON response row to be converted.

:type schema: tuple
:param schema: A tuple of
:class:`~google.cloud.bigquery.schema.SchemaField`.

:rtype: tuple
:returns: A tuple of data converted to native types.
"""
row_data = []
for field, cell in zip(schema, row['f']):
converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item['v'], field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))

return tuple(row_data)


def _rows_from_json(rows, schema):
"""Convert JSON row data to rows with appropriate types."""
return [_row_from_json(row, schema) for row in rows]
# Converters used for scalar values marshalled as query parameters.
_SCALAR_VALUE_TO_JSON_PARAM = _SCALAR_VALUE_TO_JSON_ROW.copy()
_SCALAR_VALUE_TO_JSON_PARAM['TIMESTAMP'] = _timestamp_to_json_parameter


class _ConfigurationProperty(object):
Expand Down Expand Up @@ -420,7 +442,7 @@ def to_api_repr(self):
:returns: JSON mapping
"""
value = self.value
converter = _SCALAR_VALUE_TO_JSON.get(self.type_)
converter = _SCALAR_VALUE_TO_JSON_PARAM.get(self.type_)
if converter is not None:
value = converter(value)
resource = {
Expand Down Expand Up @@ -506,7 +528,7 @@ def to_api_repr(self):
a_values = [repr_['parameterValue'] for repr_ in reprs]
else:
a_type = {'type': self.array_type}
converter = _SCALAR_VALUE_TO_JSON.get(self.array_type)
converter = _SCALAR_VALUE_TO_JSON_PARAM.get(self.array_type)
if converter is not None:
values = [converter(value) for value in values]
a_values = [{'value': value} for value in values]
Expand Down Expand Up @@ -600,7 +622,7 @@ def to_api_repr(self):
values[name] = repr_['parameterValue']
else:
s_types[name] = {'name': name, 'type': {'type': type_}}
converter = _SCALAR_VALUE_TO_JSON.get(type_)
converter = _SCALAR_VALUE_TO_JSON_PARAM.get(type_)
if converter is not None:
value = converter(value)
values[name] = {'value': value}
Expand Down
22 changes: 8 additions & 14 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import six

from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _millis_from_datetime
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import make_exception
from google.cloud.iterator import HTTPIterator
from google.cloud.streaming.exceptions import HttpError
from google.cloud.streaming.http_wrapper import Request
from google.cloud.streaming.http_wrapper import make_api_request
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
from google.cloud.streaming.transfer import Upload
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _row_from_json
from google.cloud.iterator import HTTPIterator
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'"
Expand Down Expand Up @@ -673,6 +673,9 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
(this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
"""
if len(self._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

client = self._require_client(client)
path = '%s/data' % (self.path,)
iterator = HTTPIterator(client=client, path=path,
Expand Down Expand Up @@ -741,11 +744,9 @@ def insert_data(self,
row_info = {}

for field, value in zip(self._schema, row):
if field.field_type == 'TIMESTAMP':
# BigQuery stores TIMESTAMP data internally as a
# UNIX timestamp with microsecond precision.
# Specifies the number of seconds since the epoch.
value = _convert_timestamp(value)
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
row_info[field.name] = value

info = {'json': row_info}
Expand Down Expand Up @@ -1128,10 +1129,3 @@ class _UrlBuilder(object):
def __init__(self):
self.query_params = {}
self._relative_path = ''


def _convert_timestamp(value):
"""Helper for :meth:`Table.insert_data`."""
if isinstance(value, datetime.datetime):
value = _microseconds_from_datetime(value) * 1e-6
return value
29 changes: 26 additions & 3 deletions bigquery/tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,12 @@ def test_w_bytes(self):
self.assertEqual(converted, expected)


class Test_timestamp_to_json(unittest.TestCase):
class Test_timestamp_to_json_parameter(unittest.TestCase):

def _call_fut(self, value):
from google.cloud.bigquery._helpers import _timestamp_to_json
from google.cloud.bigquery._helpers import _timestamp_to_json_parameter

return _timestamp_to_json(value)
return _timestamp_to_json_parameter(value)

def test_w_float(self):
self.assertEqual(self._call_fut(1.234567), 1.234567)
Expand Down Expand Up @@ -604,6 +604,29 @@ def test_w_datetime_w_utc_zone(self):
self.assertEqual(self._call_fut(when), ZULU)


class Test_timestamp_to_json_row(unittest.TestCase):

def _call_fut(self, value):
from google.cloud.bigquery._helpers import _timestamp_to_json_row

return _timestamp_to_json_row(value)

def test_w_float(self):
self.assertEqual(self._call_fut(1.234567), 1.234567)

def test_w_string(self):
ZULU = '2016-12-20 15:58:27.339328+00:00'
self.assertEqual(self._call_fut(ZULU), ZULU)

def test_w_datetime(self):
import datetime
from google.cloud._helpers import _microseconds_from_datetime

when = datetime.datetime(2016, 12, 20, 15, 58, 27, 339328)
self.assertEqual(
self._call_fut(when), _microseconds_from_datetime(when) / 1e6)


class Test_datetime_to_json(unittest.TestCase):

def _call_fut(self, value):
Expand Down
26 changes: 24 additions & 2 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,24 @@ def test_delete_w_alternate_client(self):
self.assertEqual(req['method'], 'DELETE')
self.assertEqual(req['path'], '/%s' % PATH)

def test_fetch_data_wo_schema(self):
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA

client = _Client(project=self.PROJECT)
dataset = _Dataset(client)
table = self._make_one(self.TABLE_NAME, dataset=dataset)
ROWS = [
('Phred Phlyntstone', 32),
('Bharney Rhubble', 33),
('Wylma Phlyntstone', 29),
('Bhettye Rhubble', 27),
]

with self.assertRaises(ValueError) as exc:
table.fetch_data()

self.assertEqual(exc.exception.args, (_TABLE_HAS_NO_SCHEMA,))

def test_fetch_data_w_bound_client(self):
import datetime
import six
Expand Down Expand Up @@ -1355,7 +1373,7 @@ def _row_data(row):
if isinstance(row[2], datetime.datetime):
joined = _microseconds_from_datetime(joined) * 1e-6
return {'full_name': row[0],
'age': row[1],
'age': str(row[1]),
'joined': joined}

SENT = {
Expand Down Expand Up @@ -1404,7 +1422,11 @@ def test_insert_data_w_alternate_client(self):
]

def _row_data(row):
return {'full_name': row[0], 'age': row[1], 'voter': row[2]}
return {
'full_name': row[0],
'age': str(row[1]),
'voter': row[2] and 'true' or 'false',
}

SENT = {
'skipInvalidRows': True,
Expand Down