Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Replace table.insert_data() with client.create_rows() #4151

Merged
merged 6 commits into from
Oct 11, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _field_to_index_mapping
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
Expand Down Expand Up @@ -832,6 +833,102 @@ def query(self, query, job_config=None, job_id=None):
job.begin()
return job

def create_rows(self, table, rows, row_ids=None, skip_invalid_rows=None,
ignore_unknown_values=None, template_suffix=None):
"""API call: insert table data via a POST request

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

:type table: :class:`~google.cloud.bigquery.client.Table`

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

:param client: the destination table for the row data.

:type rows: One of:
list of tuples
list of dictionaries
:param rows: Row data to be inserted. If a list of tuples is given,
each tuple should contain data for each schema field on
the current table and in the same order as the schema
fields. If a list of dictionaries is given, the keys must
include all required fields in the schema. Keys which do
not correspond to a field in the schema are ignored.

:type row_ids: list of string
:param row_ids: (Optional) Unique ids, one per row being inserted.
If not passed, no de-duplication occurs.

:type skip_invalid_rows: bool
:param skip_invalid_rows: (Optional) Insert all valid rows of a
request, even if invalid rows exist.
The default value is False, which causes
the entire request to fail if any invalid
rows exist.

:type ignore_unknown_values: bool
:param ignore_unknown_values: (Optional) Accept rows that contain
values that do not match the schema.
The unknown values are ignored. Default
is False, which treats unknown values as
errors.

:type template_suffix: str
:param template_suffix:
(Optional) treat ``name`` as a template table and provide a suffix.
BigQuery will create the table ``<name> + <template_suffix>`` based
on the schema of the template table. See
https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables

:rtype: list of mappings
:returns: One mapping per row with insert errors: the "index" key
identifies the row, and the "errors" key contains a list
of the mappings describing one or more problems with the
row.
:raises: ValueError if table's schema is not set
"""
if len(table._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

rows_info = []
data = {'rows': rows_info}

for index, row in enumerate(rows):
if isinstance(row, dict):
row = table.row_from_mapping(row)

This comment was marked as spam.

This comment was marked as spam.

row_info = {}

for field, value in zip(table._schema, row):
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
row_info[field.name] = value

info = {'json': row_info}
if row_ids is not None:
info['insertId'] = row_ids[index]

rows_info.append(info)

if skip_invalid_rows is not None:
data['skipInvalidRows'] = skip_invalid_rows

if ignore_unknown_values is not None:
data['ignoreUnknownValues'] = ignore_unknown_values

if template_suffix is not None:
data['templateSuffix'] = template_suffix

response = self._connection.api_request(
method='POST',
path='%s/insertAll' % table.path,
data=data)
errors = []

for error in response.get('insertErrors', ()):
errors.append({'index': int(error['index']),
'errors': error['errors']})

return errors

def query_rows(self, query, job_config=None, job_id=None, timeout=None):
"""Start a query job and wait for the results.

Expand Down
113 changes: 0 additions & 113 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _millis_from_datetime
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'client.get_table()'"
Expand Down Expand Up @@ -554,21 +553,6 @@ def from_api_repr(cls, resource, client):
table._set_properties(resource)
return table

def _require_client(self, client):
"""Check client or verify over-ride.

:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: :class:`google.cloud.bigquery.client.Client`
:returns: The client passed in or the currently bound client.
"""
if client is None:
client = self._client
return client

def _set_properties(self, api_response):
"""Update properties from resource in body of ``api_response``

Expand Down Expand Up @@ -671,103 +655,6 @@ def row_from_mapping(self, mapping):
"Unknown field mode: {}".format(field.mode))
return tuple(row)

def insert_data(self,
rows,
row_ids=None,
skip_invalid_rows=None,
ignore_unknown_values=None,
template_suffix=None,
client=None):
"""API call: insert table data via a POST request

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

:type rows: list of tuples
:param rows: Row data to be inserted. Each tuple should contain data
for each schema field on the current table and in the
same order as the schema fields.

:type row_ids: list of string
:param row_ids: Unique ids, one per row being inserted. If not
passed, no de-duplication occurs.

:type skip_invalid_rows: bool
:param skip_invalid_rows: (Optional) Insert all valid rows of a
request, even if invalid rows exist.
The default value is False, which causes
the entire request to fail if any invalid
rows exist.

:type ignore_unknown_values: bool
:param ignore_unknown_values: (Optional) Accept rows that contain
values that do not match the schema.
The unknown values are ignored. Default
is False, which treats unknown values as
errors.

:type template_suffix: str
:param template_suffix:
(Optional) treat ``name`` as a template table and provide a suffix.
BigQuery will create the table ``<name> + <template_suffix>`` based
on the schema of the template table. See
https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables

:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: list of mappings
:returns: One mapping per row with insert errors: the "index" key
identifies the row, and the "errors" key contains a list
of the mappings describing one or more problems with the
row.
:raises: ValueError if table's schema is not set
"""
if len(self._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

client = self._require_client(client)
rows_info = []
data = {'rows': rows_info}

for index, row in enumerate(rows):
row_info = {}

for field, value in zip(self._schema, row):
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
row_info[field.name] = value

info = {'json': row_info}
if row_ids is not None:
info['insertId'] = row_ids[index]

rows_info.append(info)

if skip_invalid_rows is not None:
data['skipInvalidRows'] = skip_invalid_rows

if ignore_unknown_values is not None:
data['ignoreUnknownValues'] = ignore_unknown_values

if template_suffix is not None:
data['templateSuffix'] = template_suffix

response = client._connection.api_request(
method='POST',
path='%s/insertAll' % self.path,
data=data)
errors = []

for error in response.get('insertErrors', ()):
errors.append({'index': int(error['index']),
'errors': error['errors']})

return errors


def _parse_schema_resource(info):
"""Parse a resource fragment into a schema field.
Expand Down
69 changes: 53 additions & 16 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def _fetch_single_page(table, selected_fields=None):
page = six.next(iterator.pages)
return list(page)

def test_insert_data_then_dump_table(self):
def test_create_rows_then_dump_table(self):
NOW_SECONDS = 1448911495.484366
NOW = datetime.datetime.utcfromtimestamp(
NOW_SECONDS).replace(tzinfo=UTC)
Expand All @@ -330,20 +330,21 @@ def test_insert_data_then_dump_table(self):
]
ROW_IDS = range(len(ROWS))

dataset = self.temp_dataset(_make_dataset_id('insert_data_then_dump'))
TABLE_NAME = 'test_table'
full_name = bigquery.SchemaField('full_name', 'STRING',
mode='REQUIRED')
age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
now = bigquery.SchemaField('now', 'TIMESTAMP')
table_arg = Table(dataset.table(TABLE_NAME),
schema=[full_name, age, now], client=Config.CLIENT)
dataset = self.temp_dataset(_make_dataset_id('create_rows_then_dump'))
TABLE_ID = 'test_table'
schema = [
bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
bigquery.SchemaField('now', 'TIMESTAMP'),
]
table_arg = Table(dataset.table(TABLE_ID), schema=schema,
client=Config.CLIENT)
self.assertFalse(_table_exists(table_arg))
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)
self.assertTrue(_table_exists(table))

errors = table.insert_data(ROWS, ROW_IDS)
errors = Config.CLIENT.create_rows(table, ROWS, ROW_IDS)
self.assertEqual(len(errors), 0)

rows = ()
Expand Down Expand Up @@ -1278,7 +1279,7 @@ def test_query_future(self):
row_tuples = [r.values() for r in iterator]
self.assertEqual(row_tuples, [(1,)])

def test_insert_nested_nested(self):
def test_create_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
schema = [
Expand All @@ -1299,21 +1300,57 @@ def test_insert_nested_nested(self):
to_insert = [
('Some value', record)
]
table_name = 'test_table'
table_id = 'test_table'
dataset = self.temp_dataset(_make_dataset_id('issue_2951'))
table_arg = Table(dataset.table(table_name), schema=schema,
table_arg = Table(dataset.table(table_id), schema=schema,
client=Config.CLIENT)
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)

table.insert_data(to_insert)
Config.CLIENT.create_rows(table, to_insert)

retry = RetryResult(_has_rows, max_tries=8)
rows = retry(self._fetch_single_page)(table)
row_tuples = [r.values() for r in rows]
self.assertEqual(row_tuples, to_insert)

def test_create_table_insert_fetch_nested_schema(self):
def test_create_rows_nested_nested_dictionary(self):
# See #2951
SF = bigquery.SchemaField
schema = [
SF('string_col', 'STRING', mode='NULLABLE'),
SF('record_col', 'RECORD', mode='NULLABLE', fields=[
SF('nested_string', 'STRING', mode='NULLABLE'),
SF('nested_repeated', 'INTEGER', mode='REPEATED'),
SF('nested_record', 'RECORD', mode='NULLABLE', fields=[
SF('nested_nested_string', 'STRING', mode='NULLABLE'),
]),
]),
]
record = {
'nested_string': 'another string value',
'nested_repeated': [0, 1, 2],
'nested_record': {'nested_nested_string': 'some deep insight'},
}
to_insert = [
{'string_col': 'Some value', 'record_col': record}
]
table_id = 'test_table'
dataset = self.temp_dataset(_make_dataset_id('issue_2951'))
table_arg = Table(dataset.table(table_id), schema=schema,
client=Config.CLIENT)
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)

Config.CLIENT.create_rows(table, to_insert)

retry = RetryResult(_has_rows, max_tries=8)
rows = retry(self._fetch_single_page)(table)
row_tuples = [r.values() for r in rows]
expected_rows = [('Some value', record)]
self.assertEqual(row_tuples, expected_rows)

def test_create_table_rows_fetch_nested_schema(self):
table_name = 'test_table'
dataset = self.temp_dataset(
_make_dataset_id('create_table_nested_schema'))
Expand All @@ -1334,7 +1371,7 @@ def test_create_table_insert_fetch_nested_schema(self):
to_insert.append(
tuple(mapping[field.name] for field in schema))

errors = table.insert_data(to_insert)
errors = Config.CLIENT.create_rows(table, to_insert)
self.assertEqual(len(errors), 0)

retry = RetryResult(_has_rows, max_tries=8)
Expand Down
Loading