Skip to content

Commit

Permalink
BigQuery: Replace table.insert_data() with client.create_rows() (#4151)
Browse files Browse the repository at this point in the history
* replaces table.insert_data() with client.create_rows()

* client.create_rows() accepts list of dicts as rows parameter

* adds system test for rows given as list of dictionaries to create_rows()

* adds test for create_rows() with list of Rows

* removes unused test function

* client.create_rows() accepts TableReference
  • Loading branch information
alixhami authored Oct 11, 2017
1 parent ce8deae commit 403bfc4
Show file tree
Hide file tree
Showing 5 changed files with 606 additions and 397 deletions.
115 changes: 115 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import Table, _TABLE_HAS_NO_SCHEMA
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import _row_from_mapping
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import ExtractJob
from google.cloud.bigquery.job import LoadJob
Expand All @@ -42,6 +43,7 @@
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _field_to_index_mapping
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
Expand Down Expand Up @@ -832,6 +834,119 @@ def query(self, query, job_config=None, job_id=None):
job.begin()
return job

def create_rows(self, table, rows, row_ids=None, selected_fields=None,
skip_invalid_rows=None, ignore_unknown_values=None,
template_suffix=None):
"""API call: insert table data via a POST request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
:type table: One of:
:class:`~google.cloud.bigquery.table.Table`
:class:`~google.cloud.bigquery.table.TableReference`
:param table: the destination table for the row data, or a reference
to it.
:type rows: One of:
list of tuples
list of dictionaries
:param rows: Row data to be inserted. If a list of tuples is given,
each tuple should contain data for each schema field on
the current table and in the same order as the schema
fields. If a list of dictionaries is given, the keys must
include all required fields in the schema. Keys which do
not correspond to a field in the schema are ignored.
:type row_ids: list of string
:param row_ids: (Optional) Unique ids, one per row being inserted.
If not passed, no de-duplication occurs.
:type selected_fields: list of :class:`SchemaField`
:param selected_fields:
The fields to return. Required if ``table`` is a
:class:`~google.cloud.bigquery.table.TableReference`.
:type skip_invalid_rows: bool
:param skip_invalid_rows: (Optional) Insert all valid rows of a
request, even if invalid rows exist.
The default value is False, which causes
the entire request to fail if any invalid
rows exist.
:type ignore_unknown_values: bool
:param ignore_unknown_values: (Optional) Accept rows that contain
values that do not match the schema.
The unknown values are ignored. Default
is False, which treats unknown values as
errors.
:type template_suffix: str
:param template_suffix:
(Optional) treat ``name`` as a template table and provide a suffix.
BigQuery will create the table ``<name> + <template_suffix>`` based
on the schema of the template table. See
https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
:rtype: list of mappings
:returns: One mapping per row with insert errors: the "index" key
identifies the row, and the "errors" key contains a list
of the mappings describing one or more problems with the
row.
:raises: ValueError if table's schema is not set
"""
if selected_fields is not None:
schema = selected_fields
elif isinstance(table, TableReference):
raise ValueError('need selected_fields with TableReference')
elif isinstance(table, Table):
if len(table._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)
schema = table.schema
else:
raise TypeError('table should be Table or TableReference')

rows_info = []
data = {'rows': rows_info}

for index, row in enumerate(rows):
if isinstance(row, dict):
row = _row_from_mapping(row, schema)
row_info = {}

for field, value in zip(schema, row):
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
row_info[field.name] = value

info = {'json': row_info}
if row_ids is not None:
info['insertId'] = row_ids[index]

rows_info.append(info)

if skip_invalid_rows is not None:
data['skipInvalidRows'] = skip_invalid_rows

if ignore_unknown_values is not None:
data['ignoreUnknownValues'] = ignore_unknown_values

if template_suffix is not None:
data['templateSuffix'] = template_suffix

response = self._connection.api_request(
method='POST',
path='%s/insertAll' % table.path,
data=data)
errors = []

for error in response.get('insertErrors', ()):
errors.append({'index': int(error['index']),
'errors': error['errors']})

return errors

def query_rows(self, query, job_config=None, job_id=None, timeout=None):
"""Start a query job and wait for the results.
Expand Down
172 changes: 31 additions & 141 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _millis_from_datetime
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'client.get_table()'"
Expand Down Expand Up @@ -554,21 +553,6 @@ def from_api_repr(cls, resource, client):
table._set_properties(resource)
return table

def _require_client(self, client):
"""Check client or verify over-ride.
:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
:rtype: :class:`google.cloud.bigquery.client.Client`
:returns: The client passed in or the currently bound client.
"""
if client is None:
client = self._client
return client

def _set_properties(self, api_response):
"""Update properties from resource in body of ``api_response``
Expand Down Expand Up @@ -642,131 +626,37 @@ def _build_resource(self, filter_fields):
resource[api_field] = getattr(self, f)
return resource

def row_from_mapping(self, mapping):
"""Convert a mapping to a row tuple using the schema.
:type mapping: dict
:param mapping: Mapping of row data: must contain keys for all
required fields in the schema. Keys which do not correspond
to a field in the schema are ignored.
:rtype: tuple
:returns: Tuple whose elements are ordered according to the table's
schema.
:raises: ValueError if table's schema is not set
"""
if len(self._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

row = []
for field in self.schema:
if field.mode == 'REQUIRED':
row.append(mapping[field.name])
elif field.mode == 'REPEATED':
row.append(mapping.get(field.name, ()))
elif field.mode == 'NULLABLE':
row.append(mapping.get(field.name))
else:
raise ValueError(
"Unknown field mode: {}".format(field.mode))
return tuple(row)

def insert_data(self,
rows,
row_ids=None,
skip_invalid_rows=None,
ignore_unknown_values=None,
template_suffix=None,
client=None):
"""API call: insert table data via a POST request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
:type rows: list of tuples
:param rows: Row data to be inserted. Each tuple should contain data
for each schema field on the current table and in the
same order as the schema fields.
:type row_ids: list of string
:param row_ids: Unique ids, one per row being inserted. If not
passed, no de-duplication occurs.
:type skip_invalid_rows: bool
:param skip_invalid_rows: (Optional) Insert all valid rows of a
request, even if invalid rows exist.
The default value is False, which causes
the entire request to fail if any invalid
rows exist.
:type ignore_unknown_values: bool
:param ignore_unknown_values: (Optional) Accept rows that contain
values that do not match the schema.
The unknown values are ignored. Default
is False, which treats unknown values as
errors.
:type template_suffix: str
:param template_suffix:
(Optional) treat ``name`` as a template table and provide a suffix.
BigQuery will create the table ``<name> + <template_suffix>`` based
on the schema of the template table. See
https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
:rtype: list of mappings
:returns: One mapping per row with insert errors: the "index" key
identifies the row, and the "errors" key contains a list
of the mappings describing one or more problems with the
row.
:raises: ValueError if table's schema is not set
"""
if len(self._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

client = self._require_client(client)
rows_info = []
data = {'rows': rows_info}

for index, row in enumerate(rows):
row_info = {}

for field, value in zip(self._schema, row):
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
row_info[field.name] = value

info = {'json': row_info}
if row_ids is not None:
info['insertId'] = row_ids[index]

rows_info.append(info)

if skip_invalid_rows is not None:
data['skipInvalidRows'] = skip_invalid_rows

if ignore_unknown_values is not None:
data['ignoreUnknownValues'] = ignore_unknown_values

if template_suffix is not None:
data['templateSuffix'] = template_suffix

response = client._connection.api_request(
method='POST',
path='%s/insertAll' % self.path,
data=data)
errors = []

for error in response.get('insertErrors', ()):
errors.append({'index': int(error['index']),
'errors': error['errors']})

return errors

def _row_from_mapping(mapping, schema):
"""Convert a mapping to a row tuple using the schema.
:type mapping: dict
:param mapping: Mapping of row data: must contain keys for all
required fields in the schema. Keys which do not correspond
to a field in the schema are ignored.
:type schema: list of :class:`SchemaField`
:param schema: The schema of the table destination for the rows
:rtype: tuple
:returns: Tuple whose elements are ordered according to the schema.
:raises: ValueError if schema is empty
"""
if len(schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

row = []
for field in schema:
if field.mode == 'REQUIRED':
row.append(mapping[field.name])
elif field.mode == 'REPEATED':
row.append(mapping.get(field.name, ()))
elif field.mode == 'NULLABLE':
row.append(mapping.get(field.name))
else:
raise ValueError(
"Unknown field mode: {}".format(field.mode))
return tuple(row)


def _parse_schema_resource(info):
Expand Down
Loading

0 comments on commit 403bfc4

Please sign in to comment.