From aedca10786ce814cb290ca55ad31427ef446c4dc Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 20 Aug 2019 15:21:22 -0700 Subject: [PATCH 1/8] Allow subset of schema to be passed into `load_table_from_dataframe`. The types of any remaining columns will be auto-detected. --- .../google/cloud/bigquery/_pandas_helpers.py | 33 ++++--- bigquery/google/cloud/bigquery/client.py | 17 ++-- bigquery/tests/unit/test_client.py | 88 ++++++++++++++++++- 3 files changed, 118 insertions(+), 20 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index db7f36f3d93e..fd8126d41c20 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -187,29 +187,42 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) -def dataframe_to_bq_schema(dataframe): +def dataframe_to_bq_schema(dataframe, bq_schema): """Convert a pandas DataFrame schema to a BigQuery schema. - TODO(GH#8140): Add bq_schema argument to allow overriding autodetected - schema for a subset of columns. - Args: dataframe (pandas.DataFrame): - DataFrame to convert to convert to Parquet file. + DataFrame for which the client determines the BigQuery schema. + bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + A BigQuery schema. Use this argument to override the autodetected + type for some or all of the DataFrame columns. Returns: Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: The automatically determined schema. Returns None if the type of any column cannot be determined. """ - bq_schema = [] + if bq_schema: + bq_schema_index = {field.name: field for field in bq_schema} + else: + bq_schema_index = {} + + bq_schema_out = [] for column, dtype in zip(dataframe.columns, dataframe.dtypes): + # Use provided type from schema, if present. + bq_field = bq_schema_index.get(column) + if bq_field: + bq_schema_out.append(bq_field) + continue + + # Otherwise, try to automatically determine the type based on the + # pandas dtype. bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: return None bq_field = schema.SchemaField(column, bq_type) - bq_schema.append(bq_field) - return tuple(bq_schema) + bq_schema_out.append(bq_field) + return tuple(bq_schema_out) def dataframe_to_arrow(dataframe, bq_schema): @@ -217,7 +230,7 @@ def dataframe_to_arrow(dataframe, bq_schema): Args: dataframe (pandas.DataFrame): - DataFrame to convert to convert to Parquet file. + DataFrame to convert to Arrow table. bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. @@ -255,7 +268,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN Args: dataframe (pandas.DataFrame): - DataFrame to convert to convert to Parquet file. + DataFrame to convert to Parquet file. bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 1b13ee126a5d..8152b2ff5809 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1532,14 +1532,15 @@ def load_table_from_dataframe( if location is None: location = self.location - if not job_config.schema: - autodetected_schema = _pandas_helpers.dataframe_to_bq_schema(dataframe) - - # Only use an explicit schema if we were able to determine one - # matching the dataframe. If not, fallback to the pandas to_parquet - # method. - if autodetected_schema: - job_config.schema = autodetected_schema + autodetected_schema = _pandas_helpers.dataframe_to_bq_schema( + dataframe, job_config.schema + ) + + # Only use an explicit schema if we were able to determine one + # matching the dataframe. If not, fallback to the pandas to_parquet + # method. + if autodetected_schema: + job_config.schema = autodetected_schema tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) os.close(tmpfd) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 8a2a1228cd65..fe39e3ee2944 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5432,6 +5432,90 @@ def test_load_table_from_dataframe_struct_fields_error(self): assert "struct" in err_msg assert "not support" in err_msg + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_partial_automatic_schema(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + dt_col = pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ) + ts_col = pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc) + df_data = { + "int_col": [1, 2, 3], + "int_as_float_col": [1.0, float("nan"), 3.0], + "float_col": [1.0, 2.0, 3.0], + "bool_col": [True, False, True], + "dt_col": dt_col, + "ts_col": ts_col, + "string_col": ["abc", "def", "ghi"], + } + dataframe = pandas.DataFrame( + df_data, + columns=[ + "int_col", + "int_as_float_col", + "float_col", + "bool_col", + "dt_col", + "ts_col", + "string_col", + ], + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + schema = ( + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("string_col", "STRING"), + ) + job_config = job.LoadJobConfig(schema=schema) + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert tuple(sent_config.schema) == ( + SchemaField("int_col", "INTEGER"), + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("float_col", "FLOAT"), + SchemaField("bool_col", "BOOLEAN"), + SchemaField("dt_col", "DATETIME"), + SchemaField("ts_col", "TIMESTAMP"), + SchemaField("string_col", "STRING"), + ) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): @@ -5441,7 +5525,7 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): client = self._make_client() records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] - dataframe = pandas.DataFrame(records) + dataframe = pandas.DataFrame(records, columns=["name", "age"]) schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) job_config = job.LoadJobConfig(schema=schema) @@ -5553,7 +5637,7 @@ def test_load_table_from_dataframe_w_nulls(self): client = self._make_client() records = [{"name": None, "age": None}, {"name": None, "age": None}] - dataframe = pandas.DataFrame(records) + dataframe = pandas.DataFrame(records, columns=["name", "age"]) schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] job_config = job.LoadJobConfig(schema=schema) From a580425f94e29fb600768903696b38695761b417 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 10:54:49 -0700 Subject: [PATCH 2/8] Use OrderedDict for brevity. --- bigquery/tests/unit/test_client.py | 65 ++++++++++++++---------------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index fe39e3ee2944..b122e69d2208 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5440,43 +5440,38 @@ def test_load_table_from_dataframe_w_partial_automatic_schema(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - dt_col = pandas.Series( - [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ) - ts_col = pandas.Series( + df_data = collections.OrderedDict( [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ).dt.tz_localize(pytz.utc) - df_data = { - "int_col": [1, 2, 3], - "int_as_float_col": [1.0, float("nan"), 3.0], - "float_col": [1.0, 2.0, 3.0], - "bool_col": [True, False, True], - "dt_col": dt_col, - "ts_col": ts_col, - "string_col": ["abc", "def", "ghi"], - } - dataframe = pandas.DataFrame( - df_data, - columns=[ - "int_col", - "int_as_float_col", - "float_col", - "bool_col", - "dt_col", - "ts_col", - "string_col", - ], + ("int_col", [1, 2, 3]), + ("int_as_float_col", [1.0, float("nan"), 3.0]), + ("float_col", [1.0, 2.0, 3.0]), + ("bool_col", [True, False, True]), + ( + "dt_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ), + ), + ( + "ts_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc), + ), + ("string_col", ["abc", "def", "ghi"]), + ] ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True ) From 105ebc9cc980a4eecd058504288380d38d500927 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 11:57:46 -0700 Subject: [PATCH 3/8] Use OrderedDict for DataFrame in system tests. --- bigquery/tests/system.py | 119 ++++++++++++++++++++++----------------- 1 file changed, 67 insertions(+), 52 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 59a72297ed87..1174980faada 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -743,21 +743,22 @@ def test_load_table_from_dataframe_w_nulls(self): ) num_rows = 100 nulls = [None] * num_rows - dataframe = pandas.DataFrame( - { - "bool_col": nulls, - "bytes_col": nulls, - "date_col": nulls, - "dt_col": nulls, - "float_col": nulls, - "geo_col": nulls, - "int_col": nulls, - "num_col": nulls, - "str_col": nulls, - "time_col": nulls, - "ts_col": nulls, - } + df_data = collections.OrderedDict( + [ + ("bool_col", nulls), + ("bytes_col", nulls), + ("date_col", nulls), + ("dt_col", nulls), + ("float_col", nulls), + ("geo_col", nulls), + ("int_col", nulls), + ("num_col", nulls), + ("str_col", nulls), + ("time_col", nulls), + ("ts_col", nulls), + ] ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) @@ -796,7 +797,7 @@ def test_load_table_from_dataframe_w_required(self): ) records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}] - dataframe = pandas.DataFrame(records) + dataframe = pandas.DataFrame(records, columns=["name", "age"]) job_config = bigquery.LoadJobConfig(schema=table_schema) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) @@ -847,44 +848,58 @@ def test_load_table_from_dataframe_w_explicit_schema(self): # https://jira.apache.org/jira/browse/ARROW-2587 # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), ) - dataframe = pandas.DataFrame( - { - "bool_col": [True, None, False], - "bytes_col": [b"abc", None, b"def"], - "date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], - "dt_col": [ - datetime.datetime(1, 1, 1, 0, 0, 0), - None, - datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), - ], - "float_col": [float("-inf"), float("nan"), float("inf")], - "geo_col": [ - "POINT(30 10)", - None, - "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", - ], - "int_col": [-9223372036854775808, None, 9223372036854775807], - "num_col": [ - decimal.Decimal("-99999999999999999999999999999.999999999"), - None, - decimal.Decimal("99999999999999999999999999999.999999999"), - ], - "str_col": ["abc", None, "def"], - "time_col": [ - datetime.time(0, 0, 0), - None, - datetime.time(23, 59, 59, 999999), - ], - "ts_col": [ - datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), - None, - datetime.datetime( - 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc - ), - ], - }, - dtype="object", + df_data = collections.OrderedDict( + [ + ("bool_col", [True, None, False]), + ("bytes_col", [b"abc", None, b"def"]), + ( + "date_col", + [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], + ), + ( + "dt_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + ), + ("float_col", [float("-inf"), float("nan"), float("inf")]), + ( + "geo_col", + [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ("int_col", [-9223372036854775808, None, 9223372036854775807]), + ( + "num_col", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + ), + ("str_col", ["abc", None, "def"]), + ( + "time_col", + [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], + ), + ( + "ts_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + ), + ] ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) From b8489fd466898adb2fb5ba83cbdcbe025b55e8de Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 15:22:45 -0700 Subject: [PATCH 4/8] Warn when it's not possible to determine a column type. --- .../google/cloud/bigquery/_pandas_helpers.py | 1 + bigquery/google/cloud/bigquery/client.py | 20 ++-- bigquery/tests/unit/test_client.py | 111 +++++++++++++++++- 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index fd8126d41c20..b95aae761356 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -219,6 +219,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # pandas dtype. bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: + warnings.warn("Unable to determine type of column '{}'.".format(column)) return None bq_field = schema.SchemaField(column, bq_type) bq_schema_out.append(bq_field) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 8152b2ff5809..0a1630e5cbad 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1532,6 +1532,15 @@ def load_table_from_dataframe( if location is None: location = self.location + if job_config.schema: + for field in job_config.schema: + if field.field_type in _STRUCT_TYPES: + raise ValueError( + "Uploading dataframes with struct (record) column types " + "is not supported. See: " + "https://github.com/googleapis/google-cloud-python/issues/8191" + ) + autodetected_schema = _pandas_helpers.dataframe_to_bq_schema( dataframe, job_config.schema ) @@ -1541,20 +1550,13 @@ def load_table_from_dataframe( # method. if autodetected_schema: job_config.schema = autodetected_schema + else: + job_config.schema = () tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) os.close(tmpfd) try: - if job_config.schema: - for field in job_config.schema: - if field.field_type in _STRUCT_TYPES: - raise ValueError( - "Uploading dataframes with struct (record) column types " - "is not supported. See: " - "https://github.com/googleapis/google-cloud-python/issues/8191" - ) - if pyarrow and job_config.schema: if parquet_compression == "snappy": # adjust the default value parquet_compression = parquet_compression.upper() diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index b122e69d2208..af3fb141e840 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5434,7 +5434,7 @@ def test_load_table_from_dataframe_struct_fields_error(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_partial_automatic_schema(self): + def test_load_table_from_dataframe_w_partial_schema(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job from google.cloud.bigquery.schema import SchemaField @@ -5468,7 +5468,8 @@ def test_load_table_from_dataframe_w_partial_automatic_schema(self): dtype="datetime64[ns]", ).dt.tz_localize(pytz.utc), ), - ("string_col", ["abc", "def", "ghi"]), + ("string_col", ["abc", None, "def"]), + ("bytes_col", [b"abc", b"def", None]), ] ) dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) @@ -5479,6 +5480,7 @@ def test_load_table_from_dataframe_w_partial_automatic_schema(self): schema = ( SchemaField("int_as_float_col", "INTEGER"), SchemaField("string_col", "STRING"), + SchemaField("bytes_col", "BYTES"), ) job_config = job.LoadJobConfig(schema=schema) with load_patch as load_table_from_file: @@ -5509,7 +5511,112 @@ def test_load_table_from_dataframe_w_partial_automatic_schema(self): SchemaField("dt_col", "DATETIME"), SchemaField("ts_col", "TIMESTAMP"), SchemaField("string_col", "STRING"), + SchemaField("bytes_col", "BYTES"), + ) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_partial_schema_extra_types(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + df_data = collections.OrderedDict( + [ + ("int_col", [1, 2, 3]), + ("int_as_float_col", [1.0, float("nan"), 3.0]), + ("string_col", ["abc", None, "def"]), + ] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + schema = ( + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("string_col", "STRING"), + SchemaField("unknown_col", "BYTES"), ) + job_config = job.LoadJobConfig(schema=schema) + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert tuple(sent_config.schema) == ( + SchemaField("int_col", "INTEGER"), + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("string_col", "STRING"), + ) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_partial_schema_missing_types(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + df_data = collections.OrderedDict( + [ + ("string_col", ["abc", "def", "ghi"]), + ("unknown_col", ["jkl", None, "mno"]), + ] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + schema = (SchemaField("string_col", "STRING"),) + job_config = job.LoadJobConfig(schema=schema) + with load_patch as load_table_from_file, warnings.catch_warnings( + record=True + ) as warned: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + assert warned # there should be at least one warning + unknown_col_warning = None + for warning in warned: + if "unknown_col" in str(warning): # pragma: no cover + unknown_col_warning = warning + assert unknown_col_warning.category == UserWarning + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert tuple(sent_config.schema) == () @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") From 718c17223a8844a15709be8f7ff6f7bf91c40cba Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 15:25:43 -0700 Subject: [PATCH 5/8] Refactor for coverage. --- bigquery/tests/unit/test_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index af3fb141e840..8a4f6630aafb 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5608,11 +5608,11 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): ) assert warned # there should be at least one warning - unknown_col_warning = None - for warning in warned: - if "unknown_col" in str(warning): # pragma: no cover - unknown_col_warning = warning - assert unknown_col_warning.category == UserWarning + unknown_col_warnings = [ + warning for warning in warned if "unknown_col" in str(warning) + ] + assert unknown_col_warnings + assert unknown_col_warnings[0].category == UserWarning sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.PARQUET From 8788441ee4cdfe826ad3292aed8bb92c992cfd1a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 15:38:57 -0700 Subject: [PATCH 6/8] Move struct type detection to _pandas_helpers --- .../google/cloud/bigquery/_pandas_helpers.py | 15 ++++++++++---- bigquery/google/cloud/bigquery/client.py | 20 +------------------ bigquery/tests/unit/test_client.py | 8 ++++---- 3 files changed, 16 insertions(+), 27 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index b95aae761356..ddbeb5c014c4 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -198,11 +198,18 @@ def dataframe_to_bq_schema(dataframe, bq_schema): type for some or all of the DataFrame columns. Returns: - Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: - The automatically determined schema. Returns None if the type of - any column cannot be determined. + Sequence[google.cloud.bigquery.schema.SchemaField]: + The automatically determined schema. Returns empty tuple if the + type of any column cannot be determined. """ if bq_schema: + for field in bq_schema: + if field.field_type in schema._STRUCT_TYPES: + raise ValueError( + "Uploading dataframes with struct (record) column types " + "is not supported. See: " + "https://github.com/googleapis/google-cloud-python/issues/8191" + ) bq_schema_index = {field.name: field for field in bq_schema} else: bq_schema_index = {} @@ -220,7 +227,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: warnings.warn("Unable to determine type of column '{}'.".format(column)) - return None + return () bq_field = schema.SchemaField(column, bq_type) bq_schema_out.append(bq_field) return tuple(bq_schema_out) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 0a1630e5cbad..da169cb55bf2 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -61,7 +61,6 @@ from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.routine import Routine from google.cloud.bigquery.routine import RoutineReference -from google.cloud.bigquery.schema import _STRUCT_TYPES from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import _table_arg_to_table from google.cloud.bigquery.table import _table_arg_to_table_ref @@ -1532,27 +1531,10 @@ def load_table_from_dataframe( if location is None: location = self.location - if job_config.schema: - for field in job_config.schema: - if field.field_type in _STRUCT_TYPES: - raise ValueError( - "Uploading dataframes with struct (record) column types " - "is not supported. See: " - "https://github.com/googleapis/google-cloud-python/issues/8191" - ) - - autodetected_schema = _pandas_helpers.dataframe_to_bq_schema( + job_config.schema = _pandas_helpers.dataframe_to_bq_schema( dataframe, job_config.schema ) - # Only use an explicit schema if we were able to determine one - # matching the dataframe. If not, fallback to the pandas to_parquet - # method. - if autodetected_schema: - job_config.schema = autodetected_schema - else: - job_config.schema = () - tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) os.close(tmpfd) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 8a4f6630aafb..2c156b36d7e4 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5230,7 +5230,7 @@ def test_load_table_from_dataframe(self): from google.cloud.bigquery import job client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) load_patch = mock.patch( @@ -5265,7 +5265,7 @@ def test_load_table_from_dataframe_w_client_location(self): from google.cloud.bigquery import job client = self._make_client(location=self.LOCATION) - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) load_patch = mock.patch( @@ -5300,7 +5300,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self): from google.cloud.bigquery import job client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) job_config = job.LoadJobConfig() @@ -5702,7 +5702,7 @@ def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self): client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) load_patch = mock.patch( From 1fa9389fb30919a277843f9960a8005aae935520 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 17:00:29 -0700 Subject: [PATCH 7/8] Set schema to None if can't be autodetected. --- bigquery/google/cloud/bigquery/_pandas_helpers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index ddbeb5c014c4..57ced8fac0c1 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -198,9 +198,9 @@ def dataframe_to_bq_schema(dataframe, bq_schema): type for some or all of the DataFrame columns. Returns: - Sequence[google.cloud.bigquery.schema.SchemaField]: - The automatically determined schema. Returns empty tuple if the - type of any column cannot be determined. + Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: + The automatically determined schema. Returns None if the type of + any column cannot be determined. """ if bq_schema: for field in bq_schema: @@ -227,7 +227,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: warnings.warn("Unable to determine type of column '{}'.".format(column)) - return () + return None bq_field = schema.SchemaField(column, bq_type) bq_schema_out.append(bq_field) return tuple(bq_schema_out) From ca303a6c17536164c445f40af38808319e868106 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 22 Aug 2019 11:20:09 -0700 Subject: [PATCH 8/8] Explicitly use unicode strings in tests. --- bigquery/tests/system.py | 2 +- bigquery/tests/unit/test_client.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 1174980faada..1422c3c7cb60 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -882,7 +882,7 @@ def test_load_table_from_dataframe_w_explicit_schema(self): decimal.Decimal("99999999999999999999999999999.999999999"), ], ), - ("str_col", ["abc", None, "def"]), + ("str_col", [u"abc", None, u"def"]), ( "time_col", [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 2c156b36d7e4..1fd6d87487ae 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5468,7 +5468,7 @@ def test_load_table_from_dataframe_w_partial_schema(self): dtype="datetime64[ns]", ).dt.tz_localize(pytz.utc), ), - ("string_col", ["abc", None, "def"]), + ("string_col", [u"abc", None, u"def"]), ("bytes_col", [b"abc", b"def", None]), ] ) @@ -5526,7 +5526,7 @@ def test_load_table_from_dataframe_w_partial_schema_extra_types(self): [ ("int_col", [1, 2, 3]), ("int_as_float_col", [1.0, float("nan"), 3.0]), - ("string_col", ["abc", None, "def"]), + ("string_col", [u"abc", None, u"def"]), ] ) dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) @@ -5576,8 +5576,8 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): client = self._make_client() df_data = collections.OrderedDict( [ - ("string_col", ["abc", "def", "ghi"]), - ("unknown_col", ["jkl", None, "mno"]), + ("string_col", [u"abc", u"def", u"ghi"]), + ("unknown_col", [b"jkl", None, b"mno"]), ] ) dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) @@ -5616,7 +5616,7 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.PARQUET - assert tuple(sent_config.schema) == () + assert sent_config.schema is None @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @@ -5626,7 +5626,7 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"name": u"Monty", "age": 100}, {"name": u"Python", "age": 60}] dataframe = pandas.DataFrame(records, columns=["name", "age"]) schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) job_config = job.LoadJobConfig(schema=schema) @@ -5672,7 +5672,7 @@ def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"name": u"Monty", "age": 100}, {"name": u"Python", "age": 60}] dataframe = pandas.DataFrame(records) schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) job_config = job.LoadJobConfig(schema=schema)