Skip to content

Commit

Permalink
Allow subset of schema to be passed into load_table_from_dataframe. (
Browse files Browse the repository at this point in the history
…#9064)

* Allow subset of schema to be passed into `load_table_from_dataframe`.

The types of any remaining columns will be auto-detected.

* Warn when it's not possible to determine a column type.
  • Loading branch information
tswast authored Aug 22, 2019
1 parent bc9ea52 commit adefce3
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 88 deletions.
41 changes: 31 additions & 10 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,37 +187,58 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def dataframe_to_bq_schema(dataframe):
def dataframe_to_bq_schema(dataframe, bq_schema):
"""Convert a pandas DataFrame schema to a BigQuery schema.
TODO(GH#8140): Add bq_schema argument to allow overriding autodetected
schema for a subset of columns.
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.
Returns:
Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
The automatically determined schema. Returns None if the type of
any column cannot be determined.
"""
bq_schema = []
if bq_schema:
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)
bq_schema_index = {field.name: field for field in bq_schema}
else:
bq_schema_index = {}

bq_schema_out = []
for column, dtype in zip(dataframe.columns, dataframe.dtypes):
# Use provided type from schema, if present.
bq_field = bq_schema_index.get(column)
if bq_field:
bq_schema_out.append(bq_field)
continue

# Otherwise, try to automatically determine the type based on the
# pandas dtype.
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if not bq_type:
warnings.warn("Unable to determine type of column '{}'.".format(column))
return None
bq_field = schema.SchemaField(column, bq_type)
bq_schema.append(bq_field)
return tuple(bq_schema)
bq_schema_out.append(bq_field)
return tuple(bq_schema_out)


def dataframe_to_arrow(dataframe, bq_schema):
"""Convert pandas dataframe to Arrow table, using BigQuery schema.
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
Expand Down Expand Up @@ -255,7 +276,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
Expand Down
21 changes: 3 additions & 18 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import _STRUCT_TYPES
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _table_arg_to_table
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1532,28 +1531,14 @@ def load_table_from_dataframe(
if location is None:
location = self.location

if not job_config.schema:
autodetected_schema = _pandas_helpers.dataframe_to_bq_schema(dataframe)

# Only use an explicit schema if we were able to determine one
# matching the dataframe. If not, fallback to the pandas to_parquet
# method.
if autodetected_schema:
job_config.schema = autodetected_schema
job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
dataframe, job_config.schema
)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
os.close(tmpfd)

try:
if job_config.schema:
for field in job_config.schema:
if field.field_type in _STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)

if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand Down
119 changes: 67 additions & 52 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,21 +743,22 @@ def test_load_table_from_dataframe_w_nulls(self):
)
num_rows = 100
nulls = [None] * num_rows
dataframe = pandas.DataFrame(
{
"bool_col": nulls,
"bytes_col": nulls,
"date_col": nulls,
"dt_col": nulls,
"float_col": nulls,
"geo_col": nulls,
"int_col": nulls,
"num_col": nulls,
"str_col": nulls,
"time_col": nulls,
"ts_col": nulls,
}
df_data = collections.OrderedDict(
[
("bool_col", nulls),
("bytes_col", nulls),
("date_col", nulls),
("dt_col", nulls),
("float_col", nulls),
("geo_col", nulls),
("int_col", nulls),
("num_col", nulls),
("str_col", nulls),
("time_col", nulls),
("ts_col", nulls),
]
)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down Expand Up @@ -796,7 +797,7 @@ def test_load_table_from_dataframe_w_required(self):
)

records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}]
dataframe = pandas.DataFrame(records)
dataframe = pandas.DataFrame(records, columns=["name", "age"])
job_config = bigquery.LoadJobConfig(schema=table_schema)
dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down Expand Up @@ -847,44 +848,58 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
# https://jira.apache.org/jira/browse/ARROW-2587
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema),
)
dataframe = pandas.DataFrame(
{
"bool_col": [True, None, False],
"bytes_col": [b"abc", None, b"def"],
"date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
"dt_col": [
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
"float_col": [float("-inf"), float("nan"), float("inf")],
"geo_col": [
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
"int_col": [-9223372036854775808, None, 9223372036854775807],
"num_col": [
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
"str_col": ["abc", None, "def"],
"time_col": [
datetime.time(0, 0, 0),
None,
datetime.time(23, 59, 59, 999999),
],
"ts_col": [
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
},
dtype="object",
df_data = collections.OrderedDict(
[
("bool_col", [True, None, False]),
("bytes_col", [b"abc", None, b"def"]),
(
"date_col",
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
),
(
"dt_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
),
("float_col", [float("-inf"), float("nan"), float("inf")]),
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),
(
"ts_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
),
]
)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down
Loading

0 comments on commit adefce3

Please sign in to comment.