Skip to content

Commit

Permalink
BigQuery: Add Client.insert_rows, deprecate Client.create_rows (#4657)
Browse files Browse the repository at this point in the history
* BigQuery: Add Client.insert_rows, deprecate Client.create_rows

`insert_rows` aligns better with API request (Tabledata.insertAll).
Feedback from BQ GA review.
  • Loading branch information
tswast authored Jan 2, 2018
1 parent f4e029a commit 55df6a2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 40 deletions.
3 changes: 3 additions & 0 deletions bigquery/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

## Interface changes / breaking changes

- Add `Client.insert_rows()` and `Client.insert_rows_json()`, deprecate
`Client.create_rows()` and `Client.create_rows_json()`.
([#4657](https://github.com/GoogleCloudPlatform/google-cloud-python/pull/4657))
- Add `Client.list_tables`, deprecate `Client.list_dataset_tables`.
([#4653](https://github.com/GoogleCloudPlatform/google-cloud-python/pull/4653))
- `Client.list_tables` returns an iterators of `TableListItem`. The API
Expand Down
31 changes: 26 additions & 5 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,8 +1007,8 @@ def query(self, query, job_config=None, job_id=None, job_id_prefix=None,
job._begin(retry=retry)
return job

def create_rows(self, table, rows, selected_fields=None, **kwargs):
"""API call: insert table data via a POST request
def insert_rows(self, table, rows, selected_fields=None, **kwargs):
"""Insert rows into a table via the streaming API.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
Expand Down Expand Up @@ -1073,12 +1073,12 @@ def create_rows(self, table, rows, selected_fields=None, **kwargs):

json_rows.append(json_row)

return self.create_rows_json(table, json_rows, **kwargs)
return self.insert_rows_json(table, json_rows, **kwargs)

def create_rows_json(self, table, json_rows, row_ids=None,
def insert_rows_json(self, table, json_rows, row_ids=None,
skip_invalid_rows=None, ignore_unknown_values=None,
template_suffix=None, retry=DEFAULT_RETRY):
"""API call: insert table data via a POST request
"""Insert rows into a table without applying local type conversions.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
Expand Down Expand Up @@ -1162,6 +1162,27 @@ def create_rows_json(self, table, json_rows, row_ids=None,

return errors

def create_rows(self, *args, **kwargs):
"""DEPRECATED: Insert rows into a table via the streaming API.
Use :func:`~google.cloud.bigquery.client.Client.insert_rows` instead.
"""
warnings.warn(
'create_rows is deprecated, use insert_rows instead.',
DeprecationWarning)
return self.insert_rows(*args, **kwargs)

def create_rows_json(self, *args, **kwargs):
"""DEPRECATED: Insert rows into a table without type conversions.
Use :func:`~google.cloud.bigquery.client.Client.insert_rows_json`
instead.
"""
warnings.warn(
'create_rows_json is deprecated, use insert_rows_json instead.',
DeprecationWarning)
return self.insert_rows_json(*args, **kwargs)

def list_rows(self, table, selected_fields=None, max_results=None,
page_token=None, start_index=None, retry=DEFAULT_RETRY):
"""List the rows of the table.
Expand Down
43 changes: 24 additions & 19 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ def _still_in_use(bad_request):
for doomed in self.to_delete:
if isinstance(doomed, Bucket):
retry_409(doomed.delete)(force=True)
elif isinstance(doomed, Dataset):
elif isinstance(doomed, (Dataset, bigquery.DatasetReference)):
retry_in_use(Config.CLIENT.delete_dataset)(doomed)
elif isinstance(doomed, Table):
elif isinstance(doomed, (Table, bigquery.TableReference)):
retry_in_use(Config.CLIENT.delete_table)(doomed)
else:
doomed.delete()
Expand Down Expand Up @@ -327,7 +327,7 @@ def _fetch_single_page(table, selected_fields=None):
page = six.next(iterator.pages)
return list(page)

def test_create_rows_then_dump_table(self):
def test_insert_rows_then_dump_table(self):
NOW_SECONDS = 1448911495.484366
NOW = datetime.datetime.utcfromtimestamp(
NOW_SECONDS).replace(tzinfo=UTC)
Expand All @@ -339,7 +339,7 @@ def test_create_rows_then_dump_table(self):
]
ROW_IDS = range(len(ROWS))

dataset = self.temp_dataset(_make_dataset_id('create_rows_then_dump'))
dataset = self.temp_dataset(_make_dataset_id('insert_rows_then_dump'))
TABLE_ID = 'test_table'
schema = [
bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
Expand All @@ -352,7 +352,7 @@ def test_create_rows_then_dump_table(self):
self.to_delete.insert(0, table)
self.assertTrue(_table_exists(table))

errors = Config.CLIENT.create_rows(table, ROWS, row_ids=ROW_IDS)
errors = Config.CLIENT.insert_rows(table, ROWS, row_ids=ROW_IDS)
self.assertEqual(len(errors), 0)

rows = ()
Expand Down Expand Up @@ -1315,7 +1315,7 @@ def test_query_external_table(self):
self.assertEqual(sorted(row_tuples, key=by_age),
sorted(ROWS, key=by_age))

def test_create_rows_nested_nested(self):
def test_insert_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
schema = [
Expand All @@ -1342,14 +1342,14 @@ def test_create_rows_nested_nested(self):
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)

Config.CLIENT.create_rows(table, to_insert)
Config.CLIENT.insert_rows(table, to_insert)

retry = RetryResult(_has_rows, max_tries=8)
rows = retry(self._fetch_single_page)(table)
row_tuples = [r.values() for r in rows]
self.assertEqual(row_tuples, to_insert)

def test_create_rows_nested_nested_dictionary(self):
def test_insert_rows_nested_nested_dictionary(self):
# See #2951
SF = bigquery.SchemaField
schema = [
Expand All @@ -1376,7 +1376,7 @@ def test_create_rows_nested_nested_dictionary(self):
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)

Config.CLIENT.create_rows(table, to_insert)
Config.CLIENT.insert_rows(table, to_insert)

retry = RetryResult(_has_rows, max_tries=8)
rows = retry(self._fetch_single_page)(table)
Expand All @@ -1402,7 +1402,7 @@ def test_create_table_rows_fetch_nested_schema(self):
for line in rows_file:
to_insert.append(json.loads(line))

errors = Config.CLIENT.create_rows_json(table, to_insert)
errors = Config.CLIENT.insert_rows_json(table, to_insert)
self.assertEqual(len(errors), 0)

retry = RetryResult(_has_rows, max_tries=8)
Expand Down Expand Up @@ -1467,19 +1467,24 @@ def test_nested_table_to_dataframe(self):
'nested_record': {'nested_nested_string': 'some deep insight'},
}
to_insert = [
('Some value', record)
{'string_col': 'Some value', 'record_col': record},
]
rows = [json.dumps(row) for row in to_insert]
body = six.StringIO('{}\n'.format('\n'.join(rows)))
table_id = 'test_table'
dataset = self.temp_dataset(_make_dataset_id('nested_df'))
table_arg = Table(dataset.table(table_id), schema=schema)
table = retry_403(Config.CLIENT.create_table)(table_arg)
table = dataset.table(table_id)
self.to_delete.insert(0, table)
Config.CLIENT.create_rows(table, to_insert)
QUERY = 'SELECT * from `{}.{}.{}`'.format(
Config.CLIENT.project, dataset.dataset_id, table_id)

retry = RetryResult(_has_rows, max_tries=8)
df = retry(self._fetch_dataframe)(QUERY)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
job_config.schema = schema
# Load a table using a local JSON file from memory.
Config.CLIENT.load_table_from_file(
body, table, job_config=job_config).result()

df = Config.CLIENT.list_rows(
table, selected_fields=schema).to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 1) # verify the number of rows
Expand Down
40 changes: 26 additions & 14 deletions docs/bigquery/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
need to be deleted during teardown.
"""

import json
import time

import pytest
Expand Down Expand Up @@ -64,9 +65,9 @@ def to_delete(client):
doomed = []
yield doomed
for item in doomed:
if isinstance(item, bigquery.Dataset):
if isinstance(item, (bigquery.Dataset, bigquery.DatasetReference)):
client.delete_dataset(item)
elif isinstance(item, bigquery.Table):
elif isinstance(item, (bigquery.Table, bigquery.TableReference)):
client.delete_table(item)
else:
item.delete()
Expand Down Expand Up @@ -414,28 +415,28 @@ def test_update_table_multiple_properties(client, to_delete):
# [END update_table_multiple_properties]


def test_table_create_rows(client, to_delete):
def test_table_insert_rows(client, to_delete):
"""Insert / fetch table data."""
DATASET_ID = 'table_create_rows_dataset_{}'.format(_millis())
TABLE_ID = 'table_create_rows_table_{}'.format(_millis())
dataset = bigquery.Dataset(client.dataset(DATASET_ID))
dataset_id = 'table_insert_rows_dataset_{}'.format(_millis())
table_id = 'table_insert_rows_table_{}'.format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
dataset = client.create_dataset(dataset)
to_delete.append(dataset)

table = bigquery.Table(dataset.table(TABLE_ID), schema=SCHEMA)
table = bigquery.Table(dataset.table(table_id), schema=SCHEMA)
table = client.create_table(table)
to_delete.insert(0, table)

# [START table_create_rows]
ROWS_TO_INSERT = [
# [START table_insert_rows]
rows_to_insert = [
(u'Phred Phlyntstone', 32),
(u'Wylma Phlyntstone', 29),
]

errors = client.create_rows(table, ROWS_TO_INSERT) # API request
errors = client.insert_rows(table, rows_to_insert) # API request

assert errors == []
# [END table_create_rows]
# [END table_insert_rows]


def test_load_table_from_file(client, to_delete):
Expand Down Expand Up @@ -600,9 +601,20 @@ def test_extract_table(client, to_delete):
to_delete.append(dataset)

table_ref = dataset.table('person_ages')
table = client.create_table(bigquery.Table(table_ref, schema=SCHEMA))
to_delete.insert(0, table)
client.create_rows(table, ROWS)
to_insert = [
{'full_name': name, 'age': age}
for name, age in ROWS
]
rows = [json.dumps(row) for row in to_insert]
body = six.StringIO('{}\n'.format('\n'.join(rows)))
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
job_config.schema = SCHEMA
to_delete.insert(0, table_ref)
# Load a table using a local JSON file from memory.
client.load_table_from_file(
body, table_ref, job_config=job_config).result()

bucket_name = 'extract_person_ages_job_{}'.format(_millis())
# [START extract_table]
Expand Down
4 changes: 2 additions & 2 deletions docs/bigquery/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ Utilize iterator properties returned with row data:
Insert rows into a table's data:

.. literalinclude:: snippets.py
:start-after: [START table_create_rows]
:end-before: [END table_create_rows]
:start-after: [START table_insert_rows]
:end-before: [END table_insert_rows]

Upload table data from a file:

Expand Down

0 comments on commit 55df6a2

Please sign in to comment.