Skip to content

Commit

Permalink
Improvements discused in PR conversation
Browse files Browse the repository at this point in the history
Accidentally left a duplicate test in

Correcting change to schema made by auto-rebase

Fixing missing assertTrue and reversion to not checking subset on append (both from rebase)

Replacing AssertEqual

Shortening line to pass flake
  • Loading branch information
mr-mcox committed May 12, 2017
1 parent 9dfd106 commit f612f95
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 9 deletions.
4 changes: 4 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ Changelog

- All gbq errors will simply be subclasses of ``ValueError`` and no longer inherit from the deprecated ``PandasError``.

0.1.5 / 2017-04-20
------------------
- When using ```to_gbq``` if ```if_exists``` is set to ```append```, dataframe needs to contain only a subset of the fields in the BigQuery schema. GH#24

0.1.4 / 2017-03-17
------------------

Expand Down
2 changes: 1 addition & 1 deletion docs/source/writing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists.

If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will
be written to the table using the defined table schema and column types. The
dataframe must match the destination table in structure and data types.
dataframe must contain fields (matching name and type) currently in the destination.
If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a
different schema, a delay of 2 minutes will be forced to ensure that the new schema
has propagated in the Google environment. See
Expand Down
68 changes: 60 additions & 8 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,19 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):

self._print("\n")

def verify_schema(self, dataset_id, table_id, schema):
def schema(self, dataset_id, table_id):
"""Retrieve the schema of the table
Obtain from BigQuery the field names and field types
for the table defined by the parameters
:param str dataset_id: Name of the BigQuery dataset for the table
:param str table_id: Name of the BigQuery table
:return: Fields representing the schema
:rtype: list of dicts
"""

try:
from googleapiclient.errors import HttpError
except:
Expand All @@ -573,15 +585,53 @@ def verify_schema(self, dataset_id, table_id, schema):
'type': field_remote['type']}
for field_remote in remote_schema['fields']]

fields_remote = set([json.dumps(field_remote)
for field_remote in remote_fields])
fields_local = set(json.dumps(field_local)
for field_local in schema['fields'])

return fields_remote == fields_local
return remote_fields
except HttpError as ex:
self.process_http_error(ex)

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether all fields in the former
are present in the latter. Order is not considered.
:param str dataset_id: Name of the BigQuery dataset for the table
:param str table_id: Name of the BigQuery table
:param list(dict) schema: Schema for comparison. Each item should have
a 'name' and a 'type'
:return: Whether the schemas match
:rtype: bool
"""

fields_remote = sorted(self.schema(dataset_id, table_id),
key=lambda x: x['name'])
fields_local = sorted(schema['fields'], key=lambda x: x['name'])

return fields_remote == fields_local

def schema_is_subset(self, dataset_id, table_id, schema):
"""Indicate whether the schema to be uploaded is a subset
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.
:param str dataset_id: Name of the BigQuery dataset for the table
:param str table_id: Name of the BigQuery table
:param list(dict) schema: Schema for comparison. Each item should have
a 'name' and a 'type'
:return: Whether the passed schema is a subset
:rtype: bool
"""

fields_remote = self.schema(dataset_id, table_id)
fields_local = schema['fields']

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
delay = 0

Expand Down Expand Up @@ -844,7 +894,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
elif if_exists == 'append':
if not connector.verify_schema(dataset_id, table_id, table_schema):
if not connector.schema_is_subset(dataset_id,
table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
Expand Down
83 changes: 83 additions & 0 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,30 @@ def test_upload_data_if_table_exists_append(self):
_get_project_id(), if_exists='append',
private_key=_get_private_key_path())

def test_upload_subset_columns_if_table_exists_append(self):
# For pull request #24
test_id = "16"
test_size = 10
df = make_mixed_dataframe_v2(test_size)
df_subset_cols = df.iloc[:, :2]

# Initialize table with sample data
gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(),
chunksize=10000, private_key=_get_private_key_path())

# Test the if_exists parameter with value 'append'
gbq.to_gbq(df_subset_cols,
self.destination_table + test_id, _get_project_id(),
if_exists='append', private_key=_get_private_key_path())

sleep(30) # <- Curses Google!!!

result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(self.destination_table + test_id),
project_id=_get_project_id(),
private_key=_get_private_key_path())
assert result['num_rows'][0] == test_size * 2

def test_upload_data_if_table_exists_replace(self):
test_id = "4"
test_size = 10
Expand Down Expand Up @@ -1258,6 +1282,65 @@ def test_verify_schema_ignores_field_mode(self):
assert self.sut.verify_schema(
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2)

def test_retrieve_schema(self):
# For pull request #24
test_id = "15"
test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'}]}

self.table.create(TABLE_ID + test_id, test_schema)
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
expected = test_schema['fields']
assert expected == actual, 'Expected schema used to create table'

def test_schema_is_subset_passes_if_subset(self):
# For pull request #24
test_id = '16'

table_name = TABLE_ID + test_id
dataset = self.dataset_prefix + '1'

table_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'},
{'name': 'C',
'type': 'STRING'}]}
tested_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'}]}

self.table.create(table_name, table_schema)

assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is True

def test_schema_is_subset_fails_if_not_subset(self):
# For pull request #24
test_id = '17'

table_name = TABLE_ID + test_id
dataset = self.dataset_prefix + '1'

table_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'},
{'name': 'C',
'type': 'STRING'}]}
tested_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'C',
'type': 'FLOAT'}]}

self.table.create(table_name, table_schema)

assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is False

def test_list_dataset(self):
dataset_id = self.dataset_prefix + "1"
assert dataset_id in self.dataset.datasets()
Expand Down

0 comments on commit f612f95

Please sign in to comment.