diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index d508929e5d6a..5ac0505e91ae 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -208,7 +208,7 @@ def dataframe_to_arrow(dataframe, bq_schema): return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) -def dataframe_to_parquet(dataframe, bq_schema, filepath): +def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"): """Write dataframe as a Parquet file, according to the desired BQ schema. This function requires the :mod:`pyarrow` package. Arrow is used as an @@ -222,12 +222,17 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath): columns in the DataFrame. filepath (str): Path to write Parquet file to. + parquet_compression (str): + (optional) The compression codec to use by the the + ``pyarrow.parquet.write_table`` serializing method. Defaults to + "SNAPPY". + https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table """ if pyarrow is None: raise ValueError("pyarrow is required for BigQuery schema conversion.") arrow_table = dataframe_to_arrow(dataframe, bq_schema) - pyarrow.parquet.write_table(arrow_table, filepath) + pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression) def _tabledata_list_page_to_arrow(page, column_names, arrow_types): diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index b8ce2d5a33f3..04c596975eec 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1449,6 +1449,7 @@ def load_table_from_dataframe( location=None, project=None, job_config=None, + parquet_compression="snappy", ): """Upload the contents of a table from a pandas DataFrame. @@ -1491,6 +1492,20 @@ def load_table_from_dataframe( column names matching those of the dataframe. The BigQuery schema is used to determine the correct data type conversion. Indexes are not loaded. Requires the :mod:`pyarrow` library. + parquet_compression (str): + [Beta] The compression method to use if intermittently + serializing ``dataframe`` to a parquet file. + + If ``pyarrow`` and job config schema are used, the argument + is directly passed as the ``compression`` argument to the + underlying ``pyarrow.parquet.write_table()`` method (the + default value "snappy" gets converted to uppercase). + https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table + + If either ``pyarrow`` or job config schema are missing, the + argument is directly passed as the ``compression`` argument + to the underlying ``DataFrame.to_parquet()`` method. + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -1515,8 +1530,14 @@ def load_table_from_dataframe( try: if pyarrow and job_config.schema: + if parquet_compression == "snappy": # adjust the default value + parquet_compression = parquet_compression.upper() + _pandas_helpers.dataframe_to_parquet( - dataframe, job_config.schema, tmppath + dataframe, + job_config.schema, + tmppath, + parquet_compression=parquet_compression, ) else: if job_config.schema: @@ -1527,7 +1548,7 @@ def load_table_from_dataframe( PendingDeprecationWarning, stacklevel=2, ) - dataframe.to_parquet(tmppath) + dataframe.to_parquet(tmppath, compression=parquet_compression) with open(tmppath, "rb") as parquet_file: return self.load_table_from_file( diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 9348635f2dc6..6aad587837b4 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -17,6 +17,8 @@ import functools import warnings +import mock + try: import pandas except ImportError: # pragma: NO COVER @@ -613,3 +615,23 @@ def test_dataframe_to_parquet_w_missing_columns(module_under_test, monkeypatch): pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None ) assert "columns in schema must match" in str(exc_context.value) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_parquet_compression_method(module_under_test): + bq_schema = (schema.SchemaField("field00", "STRING"),) + dataframe = pandas.DataFrame({"field00": ["foo", "bar"]}) + + write_table_patch = mock.patch.object( + module_under_test.pyarrow.parquet, "write_table", autospec=True + ) + + with write_table_patch as fake_write_table: + module_under_test.dataframe_to_parquet( + dataframe, bq_schema, None, parquet_compression="ZSTD" + ) + + call_args = fake_write_table.call_args + assert call_args is not None + assert call_args.kwargs.get("compression") == "ZSTD" diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 2be40a52e1fc..c4e9c5e830ac 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5375,6 +5375,66 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): assert sent_config.source_format == job.SourceFormat.PARQUET assert tuple(sent_config.schema) == schema + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + dataframe = pandas.DataFrame(records) + schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) + job_config = job.LoadJobConfig(schema=schema) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + to_parquet_patch = mock.patch( + "google.cloud.bigquery.client._pandas_helpers.dataframe_to_parquet", + autospec=True, + ) + + with load_patch, to_parquet_patch as fake_to_parquet: + client.load_table_from_dataframe( + dataframe, + self.TABLE_REF, + job_config=job_config, + location=self.LOCATION, + parquet_compression="LZ4", + ) + + call_args = fake_to_parquet.call_args + assert call_args is not None + assert call_args.kwargs.get("parquet_compression") == "LZ4" + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self): + client = self._make_client() + records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + dataframe = pandas.DataFrame(records) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None) + to_parquet_patch = mock.patch.object( + dataframe, "to_parquet", wraps=dataframe.to_parquet + ) + + with load_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy: + client.load_table_from_dataframe( + dataframe, + self.TABLE_REF, + location=self.LOCATION, + parquet_compression="gzip", + ) + + call_args = to_parquet_spy.call_args + assert call_args is not None + assert call_args.kwargs.get("compression") == "gzip" + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_nulls(self):