Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Allow subset of schema to be passed into load_table_from_dataframe. #9064

Merged
merged 9 commits into from
Aug 22, 2019
49 changes: 35 additions & 14 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,37 +187,58 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def dataframe_to_bq_schema(dataframe):
def dataframe_to_bq_schema(dataframe, bq_schema):
"""Convert a pandas DataFrame schema to a BigQuery schema.

TODO(GH#8140): Add bq_schema argument to allow overriding autodetected
schema for a subset of columns.

Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.

Returns:
Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
The automatically determined schema. Returns None if the type of
any column cannot be determined.
Sequence[google.cloud.bigquery.schema.SchemaField]:
The automatically determined schema. Returns empty tuple if the
type of any column cannot be determined.
"""
bq_schema = []
if bq_schema:
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)
bq_schema_index = {field.name: field for field in bq_schema}
else:
bq_schema_index = {}

bq_schema_out = []
for column, dtype in zip(dataframe.columns, dataframe.dtypes):
# Use provided type from schema, if present.
bq_field = bq_schema_index.get(column)
if bq_field:
bq_schema_out.append(bq_field)
continue

# Otherwise, try to automatically determine the type based on the
# pandas dtype.
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if not bq_type:
return None
warnings.warn("Unable to determine type of column '{}'.".format(column))
return ()
bq_field = schema.SchemaField(column, bq_type)
bq_schema.append(bq_field)
return tuple(bq_schema)
bq_schema_out.append(bq_field)
return tuple(bq_schema_out)


def dataframe_to_arrow(dataframe, bq_schema):
"""Convert pandas dataframe to Arrow table, using BigQuery schema.

Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
Expand Down Expand Up @@ -255,7 +276,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN

Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
Expand Down
21 changes: 3 additions & 18 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import _STRUCT_TYPES
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _table_arg_to_table
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1532,28 +1531,14 @@ def load_table_from_dataframe(
if location is None:
location = self.location

if not job_config.schema:
autodetected_schema = _pandas_helpers.dataframe_to_bq_schema(dataframe)

# Only use an explicit schema if we were able to determine one
# matching the dataframe. If not, fallback to the pandas to_parquet
# method.
if autodetected_schema:
job_config.schema = autodetected_schema
job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
dataframe, job_config.schema
)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
os.close(tmpfd)

try:
if job_config.schema:
for field in job_config.schema:
if field.field_type in _STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)

if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand Down
119 changes: 67 additions & 52 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,21 +743,22 @@ def test_load_table_from_dataframe_w_nulls(self):
)
num_rows = 100
nulls = [None] * num_rows
dataframe = pandas.DataFrame(
{
"bool_col": nulls,
"bytes_col": nulls,
"date_col": nulls,
"dt_col": nulls,
"float_col": nulls,
"geo_col": nulls,
"int_col": nulls,
"num_col": nulls,
"str_col": nulls,
"time_col": nulls,
"ts_col": nulls,
}
df_data = collections.OrderedDict(
[
("bool_col", nulls),
("bytes_col", nulls),
("date_col", nulls),
("dt_col", nulls),
("float_col", nulls),
("geo_col", nulls),
("int_col", nulls),
("num_col", nulls),
("str_col", nulls),
("time_col", nulls),
("ts_col", nulls),
]
)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down Expand Up @@ -796,7 +797,7 @@ def test_load_table_from_dataframe_w_required(self):
)

records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}]
dataframe = pandas.DataFrame(records)
dataframe = pandas.DataFrame(records, columns=["name", "age"])
job_config = bigquery.LoadJobConfig(schema=table_schema)
dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down Expand Up @@ -847,44 +848,58 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
# https://jira.apache.org/jira/browse/ARROW-2587
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema),
)
dataframe = pandas.DataFrame(
{
"bool_col": [True, None, False],
"bytes_col": [b"abc", None, b"def"],
"date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
"dt_col": [
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
"float_col": [float("-inf"), float("nan"), float("inf")],
"geo_col": [
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
"int_col": [-9223372036854775808, None, 9223372036854775807],
"num_col": [
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
"str_col": ["abc", None, "def"],
"time_col": [
datetime.time(0, 0, 0),
None,
datetime.time(23, 59, 59, 999999),
],
"ts_col": [
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
},
dtype="object",
df_data = collections.OrderedDict(
[
("bool_col", [True, None, False]),
("bytes_col", [b"abc", None, b"def"]),
(
"date_col",
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
),
(
"dt_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
),
("float_col", [float("-inf"), float("nan"), float("inf")]),
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", ["abc", None, "def"]),
tswast marked this conversation as resolved.
Show resolved Hide resolved
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),
(
"ts_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
tswast marked this conversation as resolved.
Show resolved Hide resolved
),
],
),
]
)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
Expand Down
Loading