Skip to content

Commit

Permalink
When appending to a table, load if the dataframe contains a subset of…
Browse files Browse the repository at this point in the history
… the existing schema (googleapis#24)

* Improvements discused in PR conversation

Accidentally left a duplicate test in

Correcting change to schema made by auto-rebase

Fixing missing assertTrue and reversion to not checking subset on append (both from rebase)

Replacing AssertEqual

Shortening line to pass flake

* Making updates per jreback's requested changes

* Fixing trailing whitespace

* Adding detail to changelog

* Use wait_for_job rather than sleep

* Revert "Use wait_for_job rather than sleep"

This reverts commit 8726a01.

* Minor tweaks before merging

* Update the to_gbq doc-string as suggested by @jreback

* Make travis happy
  • Loading branch information
mr-mcox authored and parthea committed Jun 13, 2017
1 parent 47bcc66 commit c210de1
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Changelog

- Resolve issue where the optional ``--noauth_local_webserver`` command line argument would not be propagated during the authentication process. (:issue:`35`)
- Drop support for Python 3.4 (:issue:`40`)
- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. (:issue:`24`)


0.1.6 / 2017-05-03
------------------
Expand Down
2 changes: 1 addition & 1 deletion docs/source/writing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists.

If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will
be written to the table using the defined table schema and column types. The
dataframe must match the destination table in structure and data types.
dataframe must contain fields (matching name and type) currently in the destination table.
If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a
different schema, a delay of 2 minutes will be forced to ensure that the new schema
has propagated in the Google environment. See
Expand Down
92 changes: 83 additions & 9 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,25 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):

self._print("\n")

def verify_schema(self, dataset_id, table_id, schema):
def schema(self, dataset_id, table_id):
"""Retrieve the schema of the table
Obtain from BigQuery the field names and field types
for the table defined by the parameters
Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
Returns
-------
list of dicts
Fields representing the schema
"""

try:
from googleapiclient.errors import HttpError
except:
Expand All @@ -573,15 +591,67 @@ def verify_schema(self, dataset_id, table_id, schema):
'type': field_remote['type']}
for field_remote in remote_schema['fields']]

fields_remote = set([json.dumps(field_remote)
for field_remote in remote_fields])
fields_local = set(json.dumps(field_local)
for field_local in schema['fields'])

return fields_remote == fields_local
return remote_fields
except HttpError as ex:
self.process_http_error(ex)

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether all fields in the former
are present in the latter. Order is not considered.
Parameters
----------
dataset_id :str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'
Returns
-------
bool
Whether the schemas match
"""

fields_remote = sorted(self.schema(dataset_id, table_id),
key=lambda x: x['name'])
fields_local = sorted(schema['fields'], key=lambda x: x['name'])

return fields_remote == fields_local

def schema_is_subset(self, dataset_id, table_id, schema):
"""Indicate whether the schema to be uploaded is a subset
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.
Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'
Returns
-------
bool
Whether the passed schema is a subset
"""

fields_remote = self.schema(dataset_id, table_id)
fields_local = schema['fields']

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
delay = 0

Expand Down Expand Up @@ -810,7 +880,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
if_exists : {'fail', 'replace', 'append'}, default 'fail'
'fail': If table exists, do nothing.
'replace': If table exists, drop it, recreate it, and insert data.
'append': If table exists, insert data. Create if does not exist.
'append': If table exists and the dataframe schema is a subset of
the destination table schema, insert data. Create destination table
if does not exist.
private_key : str (optional)
Service account private key in JSON format. Can be file path
or string contents. This is useful for remote server
Expand Down Expand Up @@ -844,7 +916,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
elif if_exists == 'append':
if not connector.verify_schema(dataset_id, table_id, table_schema):
if not connector.schema_is_subset(dataset_id,
table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
Expand Down
85 changes: 85 additions & 0 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,31 @@ def test_upload_data_if_table_exists_append(self):
_get_project_id(), if_exists='append',
private_key=_get_private_key_path())

def test_upload_subset_columns_if_table_exists_append(self):
# Issue 24: Upload is succesful if dataframe has columns
# which are a subset of the current schema
test_id = "16"
test_size = 10
df = make_mixed_dataframe_v2(test_size)
df_subset_cols = df.iloc[:, :2]

# Initialize table with sample data
gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(),
chunksize=10000, private_key=_get_private_key_path())

# Test the if_exists parameter with value 'append'
gbq.to_gbq(df_subset_cols,
self.destination_table + test_id, _get_project_id(),
if_exists='append', private_key=_get_private_key_path())

sleep(30) # <- Curses Google!!!

result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(self.destination_table + test_id),
project_id=_get_project_id(),
private_key=_get_private_key_path())
assert result['num_rows'][0] == test_size * 2

def test_upload_data_if_table_exists_replace(self):
test_id = "4"
test_size = 10
Expand Down Expand Up @@ -1258,6 +1283,66 @@ def test_verify_schema_ignores_field_mode(self):
assert self.sut.verify_schema(
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2)

def test_retrieve_schema(self):
# Issue #24 schema function returns the schema in biquery
test_id = "15"
test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'}]}

self.table.create(TABLE_ID + test_id, test_schema)
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
expected = test_schema['fields']
assert expected == actual, 'Expected schema used to create table'

def test_schema_is_subset_passes_if_subset(self):
# Issue #24 schema_is_subset indicates whether the schema of the
# dataframe is a subset of the schema of the bigquery table
test_id = '16'

table_name = TABLE_ID + test_id
dataset = self.dataset_prefix + '1'

table_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'},
{'name': 'C',
'type': 'STRING'}]}
tested_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'}]}

self.table.create(table_name, table_schema)

assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is True

def test_schema_is_subset_fails_if_not_subset(self):
# For pull request #24
test_id = '17'

table_name = TABLE_ID + test_id
dataset = self.dataset_prefix + '1'

table_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'},
{'name': 'C',
'type': 'STRING'}]}
tested_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'C',
'type': 'FLOAT'}]}

self.table.create(table_name, table_schema)

assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is False

def test_list_dataset(self):
dataset_id = self.dataset_prefix + "1"
assert dataset_id in self.dataset.datasets()
Expand Down

0 comments on commit c210de1

Please sign in to comment.