diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 53c1a1b9..011a65a2 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,8 @@ Changelog - Resolve issue where the optional ``--noauth_local_webserver`` command line argument would not be propagated during the authentication process. (:issue:`35`) - Drop support for Python 3.4 (:issue:`40`) +- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. (:issue:`24`) + 0.1.6 / 2017-05-03 ------------------ diff --git a/docs/source/writing.rst b/docs/source/writing.rst index f0dc0aaa..2a30bc35 100644 --- a/docs/source/writing.rst +++ b/docs/source/writing.rst @@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists. If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will be written to the table using the defined table schema and column types. The - dataframe must match the destination table in structure and data types. + dataframe must contain fields (matching name and type) currently in the destination table. If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a different schema, a delay of 2 minutes will be forced to ensure that the new schema has propagated in the Google environment. See diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 8d389100..0c34124c 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -557,7 +557,25 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize): self._print("\n") - def verify_schema(self, dataset_id, table_id, schema): + def schema(self, dataset_id, table_id): + """Retrieve the schema of the table + + Obtain from BigQuery the field names and field types + for the table defined by the parameters + + Parameters + ---------- + dataset_id : str + Name of the BigQuery dataset for the table + table_id : str + Name of the BigQuery table + + Returns + ------- + list of dicts + Fields representing the schema + """ + try: from googleapiclient.errors import HttpError except: @@ -573,15 +591,67 @@ def verify_schema(self, dataset_id, table_id, schema): 'type': field_remote['type']} for field_remote in remote_schema['fields']] - fields_remote = set([json.dumps(field_remote) - for field_remote in remote_fields]) - fields_local = set(json.dumps(field_local) - for field_local in schema['fields']) - - return fields_remote == fields_local + return remote_fields except HttpError as ex: self.process_http_error(ex) + def verify_schema(self, dataset_id, table_id, schema): + """Indicate whether schemas match exactly + + Compare the BigQuery table identified in the parameters with + the schema passed in and indicate whether all fields in the former + are present in the latter. Order is not considered. + + Parameters + ---------- + dataset_id :str + Name of the BigQuery dataset for the table + table_id : str + Name of the BigQuery table + schema : list(dict) + Schema for comparison. Each item should have + a 'name' and a 'type' + + Returns + ------- + bool + Whether the schemas match + """ + + fields_remote = sorted(self.schema(dataset_id, table_id), + key=lambda x: x['name']) + fields_local = sorted(schema['fields'], key=lambda x: x['name']) + + return fields_remote == fields_local + + def schema_is_subset(self, dataset_id, table_id, schema): + """Indicate whether the schema to be uploaded is a subset + + Compare the BigQuery table identified in the parameters with + the schema passed in and indicate whether a subset of the fields in + the former are present in the latter. Order is not considered. + + Parameters + ---------- + dataset_id : str + Name of the BigQuery dataset for the table + table_id : str + Name of the BigQuery table + schema : list(dict) + Schema for comparison. Each item should have + a 'name' and a 'type' + + Returns + ------- + bool + Whether the passed schema is a subset + """ + + fields_remote = self.schema(dataset_id, table_id) + fields_local = schema['fields'] + + return all(field in fields_remote for field in fields_local) + def delete_and_recreate_table(self, dataset_id, table_id, table_schema): delay = 0 @@ -810,7 +880,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, if_exists : {'fail', 'replace', 'append'}, default 'fail' 'fail': If table exists, do nothing. 'replace': If table exists, drop it, recreate it, and insert data. - 'append': If table exists, insert data. Create if does not exist. + 'append': If table exists and the dataframe schema is a subset of + the destination table schema, insert data. Create destination table + if does not exist. private_key : str (optional) Service account private key in JSON format. Can be file path or string contents. This is useful for remote server @@ -844,7 +916,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, connector.delete_and_recreate_table( dataset_id, table_id, table_schema) elif if_exists == 'append': - if not connector.verify_schema(dataset_id, table_id, table_schema): + if not connector.schema_is_subset(dataset_id, + table_id, + table_schema): raise InvalidSchema("Please verify that the structure and " "data types in the DataFrame match the " "schema of the destination table.") diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index 2ca57b16..069bc7ee 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1071,6 +1071,31 @@ def test_upload_data_if_table_exists_append(self): _get_project_id(), if_exists='append', private_key=_get_private_key_path()) + def test_upload_subset_columns_if_table_exists_append(self): + # Issue 24: Upload is succesful if dataframe has columns + # which are a subset of the current schema + test_id = "16" + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + df_subset_cols = df.iloc[:, :2] + + # Initialize table with sample data + gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'append' + gbq.to_gbq(df_subset_cols, + self.destination_table + test_id, _get_project_id(), + if_exists='append', private_key=_get_private_key_path()) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(self.destination_table + test_id), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == test_size * 2 + def test_upload_data_if_table_exists_replace(self): test_id = "4" test_size = 10 @@ -1258,6 +1283,66 @@ def test_verify_schema_ignores_field_mode(self): assert self.sut.verify_schema( self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2) + def test_retrieve_schema(self): + # Issue #24 schema function returns the schema in biquery + test_id = "15" + test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, + {'name': 'D', 'type': 'TIMESTAMP'}]} + + self.table.create(TABLE_ID + test_id, test_schema) + actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id) + expected = test_schema['fields'] + assert expected == actual, 'Expected schema used to create table' + + def test_schema_is_subset_passes_if_subset(self): + # Issue #24 schema_is_subset indicates whether the schema of the + # dataframe is a subset of the schema of the bigquery table + test_id = '16' + + table_name = TABLE_ID + test_id + dataset = self.dataset_prefix + '1' + + table_schema = {'fields': [{'name': 'A', + 'type': 'FLOAT'}, + {'name': 'B', + 'type': 'FLOAT'}, + {'name': 'C', + 'type': 'STRING'}]} + tested_schema = {'fields': [{'name': 'A', + 'type': 'FLOAT'}, + {'name': 'B', + 'type': 'FLOAT'}]} + + self.table.create(table_name, table_schema) + + assert self.sut.schema_is_subset( + dataset, table_name, tested_schema) is True + + def test_schema_is_subset_fails_if_not_subset(self): + # For pull request #24 + test_id = '17' + + table_name = TABLE_ID + test_id + dataset = self.dataset_prefix + '1' + + table_schema = {'fields': [{'name': 'A', + 'type': 'FLOAT'}, + {'name': 'B', + 'type': 'FLOAT'}, + {'name': 'C', + 'type': 'STRING'}]} + tested_schema = {'fields': [{'name': 'A', + 'type': 'FLOAT'}, + {'name': 'C', + 'type': 'FLOAT'}]} + + self.table.create(table_name, table_schema) + + assert self.sut.schema_is_subset( + dataset, table_name, tested_schema) is False + def test_list_dataset(self): dataset_id = self.dataset_prefix + "1" assert dataset_id in self.dataset.datasets()