diff --git a/bigquery/docs/snippets.py b/bigquery/docs/snippets.py index 9b4218286402..51b9d9c3fc1c 100644 --- a/bigquery/docs/snippets.py +++ b/bigquery/docs/snippets.py @@ -2741,52 +2741,5 @@ def test_list_rows_as_dataframe(client): assert len(df) == table.num_rows # verify the number of rows -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.parametrize("parquet_engine", ["pyarrow", "fastparquet"]) -def test_load_table_from_dataframe(client, to_delete, parquet_engine): - if parquet_engine == "pyarrow" and pyarrow is None: - pytest.skip("Requires `pyarrow`") - if parquet_engine == "fastparquet" and fastparquet is None: - pytest.skip("Requires `fastparquet`") - - pandas.set_option("io.parquet.engine", parquet_engine) - - dataset_id = "load_table_from_dataframe_{}".format(_millis()) - dataset = bigquery.Dataset(client.dataset(dataset_id)) - client.create_dataset(dataset) - to_delete.append(dataset) - - # [START bigquery_load_table_dataframe] - # from google.cloud import bigquery - # import pandas - # client = bigquery.Client() - # dataset_id = 'my_dataset' - - dataset_ref = client.dataset(dataset_id) - table_ref = dataset_ref.table("monty_python") - records = [ - {"title": u"The Meaning of Life", "release_year": 1983}, - {"title": u"Monty Python and the Holy Grail", "release_year": 1975}, - {"title": u"Life of Brian", "release_year": 1979}, - {"title": u"And Now for Something Completely Different", "release_year": 1971}, - ] - # Optionally set explicit indices. - # If indices are not specified, a column will be created for the default - # indices created by pandas. - index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"] - dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id")) - - job = client.load_table_from_dataframe(dataframe, table_ref, location="US") - - job.result() # Waits for table load to complete. - - assert job.state == "DONE" - table = client.get_table(table_ref) - assert table.num_rows == 4 - # [END bigquery_load_table_dataframe] - column_names = [field.name for field in table.schema] - assert sorted(column_names) == ["release_year", "title", "wikidata_id"] - - if __name__ == "__main__": pytest.main() diff --git a/bigquery/docs/usage/pandas.rst b/bigquery/docs/usage/pandas.rst index 9504bd19673a..9db98dfbbccb 100644 --- a/bigquery/docs/usage/pandas.rst +++ b/bigquery/docs/usage/pandas.rst @@ -55,7 +55,7 @@ install the BigQuery python client library with :mod:`pandas` and The following example demonstrates how to create a :class:`pandas.DataFrame` and load it into a new table: -.. literalinclude:: ../snippets.py +.. literalinclude:: ../samples/load_table_dataframe.py :language: python :dedent: 4 :start-after: [START bigquery_load_table_dataframe] diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index c41ceb6b0306..bda8c5611435 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -36,6 +36,7 @@ from google.cloud.bigquery.dataset import AccessEntry from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetReference +from google.cloud.bigquery import enums from google.cloud.bigquery.enums import StandardSqlDataTypes from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.external_config import BigtableOptions @@ -124,6 +125,7 @@ "GoogleSheetsOptions", "DEFAULT_RETRY", # Enum Constants + "enums", "Compression", "CreateDisposition", "DestinationFormat", diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 2d2fb8af24d3..5e73c9f58e22 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -187,6 +187,49 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) +def get_column_or_index(dataframe, name): + """Return a column or index as a pandas series.""" + if name in dataframe.columns: + return dataframe[name].reset_index(drop=True) + + if isinstance(dataframe.index, pandas.MultiIndex): + if name in dataframe.index.names: + return ( + dataframe.index.get_level_values(name) + .to_series() + .reset_index(drop=True) + ) + else: + if name == dataframe.index.name: + return dataframe.index.to_series().reset_index(drop=True) + + raise ValueError("column or index '{}' not found.".format(name)) + + +def list_columns_and_indexes(dataframe): + """Return all index and column names with dtypes. + + Returns: + Sequence[Tuple[dtype, str]]: + Returns a sorted list of indexes and column names with + corresponding dtypes. If an index is missing a name or has the + same name as a column, the index is omitted. + """ + column_names = frozenset(dataframe.columns) + columns_and_indexes = [] + if isinstance(dataframe.index, pandas.MultiIndex): + for name in dataframe.index.names: + if name and name not in column_names: + values = dataframe.index.get_level_values(name) + columns_and_indexes.append((name, values.dtype)) + else: + if dataframe.index.name and dataframe.index.name not in column_names: + columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) + + columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) + return columns_and_indexes + + def dataframe_to_bq_schema(dataframe, bq_schema): """Convert a pandas DataFrame schema to a BigQuery schema. @@ -217,7 +260,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): bq_schema_unused = set() bq_schema_out = [] - for column, dtype in zip(dataframe.columns, dataframe.dtypes): + for column, dtype in list_columns_and_indexes(dataframe): # Use provided type from schema, if present. bq_field = bq_schema_index.get(column) if bq_field: @@ -229,7 +272,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # pandas dtype. bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: - warnings.warn("Unable to determine type of column '{}'.".format(column)) + warnings.warn(u"Unable to determine type of column '{}'.".format(column)) return None bq_field = schema.SchemaField(column, bq_type) bq_schema_out.append(bq_field) @@ -238,7 +281,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # column, but it was not found. if bq_schema_unused: raise ValueError( - "bq_schema contains fields not present in dataframe: {}".format( + u"bq_schema contains fields not present in dataframe: {}".format( bq_schema_unused ) ) @@ -261,20 +304,25 @@ def dataframe_to_arrow(dataframe, bq_schema): BigQuery schema. """ column_names = set(dataframe.columns) + column_and_index_names = set( + name for name, _ in list_columns_and_indexes(dataframe) + ) bq_field_names = set(field.name for field in bq_schema) - extra_fields = bq_field_names - column_names + extra_fields = bq_field_names - column_and_index_names if extra_fields: raise ValueError( - "bq_schema contains fields not present in dataframe: {}".format( + u"bq_schema contains fields not present in dataframe: {}".format( extra_fields ) ) + # It's okay for indexes to be missing from bq_schema, but it's not okay to + # be missing columns. missing_fields = column_names - bq_field_names if missing_fields: raise ValueError( - "bq_schema is missing fields from dataframe: {}".format(missing_fields) + u"bq_schema is missing fields from dataframe: {}".format(missing_fields) ) arrow_arrays = [] @@ -283,7 +331,9 @@ def dataframe_to_arrow(dataframe, bq_schema): for bq_field in bq_schema: arrow_fields.append(bq_to_arrow_field(bq_field)) arrow_names.append(bq_field.name) - arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field)) + arrow_arrays.append( + bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) + ) if all((field is not None for field in arrow_fields)): return pyarrow.Table.from_arrays( diff --git a/bigquery/samples/load_table_dataframe.py b/bigquery/samples/load_table_dataframe.py new file mode 100644 index 000000000000..69eeb6ef89d0 --- /dev/null +++ b/bigquery/samples/load_table_dataframe.py @@ -0,0 +1,73 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def load_table_dataframe(client, table_id): + # [START bigquery_load_table_dataframe] + from google.cloud import bigquery + import pandas + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + # TODO(developer): Set table_id to the ID of the table to create. + # table_id = "your-project.your_dataset.your_table_name" + + records = [ + {"title": u"The Meaning of Life", "release_year": 1983}, + {"title": u"Monty Python and the Holy Grail", "release_year": 1975}, + {"title": u"Life of Brian", "release_year": 1979}, + {"title": u"And Now for Something Completely Different", "release_year": 1971}, + ] + dataframe = pandas.DataFrame( + records, + # In the loaded table, the column order reflects the order of the + # columns in the DataFrame. + columns=["title", "release_year"], + # Optionally, set a named index, which can also be written to the + # BigQuery table. + index=pandas.Index( + [u"Q24980", u"Q25043", u"Q24953", u"Q16403"], name="wikidata_id" + ), + ) + job_config = bigquery.LoadJobConfig( + # Specify a (partial) schema. All columns are always written to the + # table. The schema is used to assist in data type definitions. + schema=[ + # Specify the type of columns whose type cannot be auto-detected. For + # example the "title" column uses pandas dtype "object", so its + # data type is ambiguous. + bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING), + # Indexes are written if included in the schema by name. + bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING), + ], + # Optionally, set the write disposition. BigQuery appends loaded rows + # to an existing table by default, but with WRITE_TRUNCATE write + # disposition it replaces the table with the loaded data. + write_disposition="WRITE_TRUNCATE", + ) + + job = client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config, location="US" + ) + job.result() # Waits for table load to complete. + + table = client.get_table(table_id) + print( + "Loaded {} rows and {} columns to {}".format( + table.num_rows, len(table.schema), table_id + ) + ) + # [END bigquery_load_table_dataframe] + return table diff --git a/bigquery/samples/tests/test_load_table_dataframe.py b/bigquery/samples/tests/test_load_table_dataframe.py new file mode 100644 index 000000000000..d553d449a525 --- /dev/null +++ b/bigquery/samples/tests/test_load_table_dataframe.py @@ -0,0 +1,30 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from .. import load_table_dataframe + + +pytest.importorskip("pandas") +pytest.importorskip("pyarrow") + + +def test_load_table_dataframe(capsys, client, random_table_id): + table = load_table_dataframe.load_table_dataframe(client, random_table_id) + out, _ = capsys.readouterr() + assert "Loaded 4 rows and 3 columns" in out + + column_names = [field.name for field in table.schema] + assert column_names == ["wikidata_id", "title", "release_year"] diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index facfb79b3ccb..b539abe9a89a 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import datetime import decimal import functools @@ -21,6 +22,8 @@ try: import pandas + import pandas.api.types + import pandas.testing except ImportError: # pragma: NO COVER pandas = None try: @@ -511,9 +514,262 @@ def test_bq_to_arrow_schema_w_unknown_type(module_under_test): assert actual is None +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_not_found(module_under_test): + dataframe = pandas.DataFrame({"not_the_column_youre_looking_for": [1, 2, 3]}) + with pytest.raises(ValueError, match="col_is_missing"): + module_under_test.get_column_or_index(dataframe, "col_is_missing") + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_multiindex_not_found(module_under_test): + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3, 4, 5, 6]}, + index=pandas.MultiIndex.from_tuples( + [("a", 0), ("a", 1), ("b", 0), ("b", 1), ("c", 0), ("c", 1)] + ), + ) + with pytest.raises(ValueError, match="not_in_df"): + module_under_test.get_column_or_index(dataframe, "not_in_df") + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_both_prefers_column(module_under_test): + dataframe = pandas.DataFrame( + {"some_name": [1, 2, 3]}, index=pandas.Index([0, 1, 2], name="some_name") + ) + series = module_under_test.get_column_or_index(dataframe, "some_name") + expected = pandas.Series([1, 2, 3], name="some_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_column(module_under_test): + dataframe = pandas.DataFrame({"column_name": [1, 2, 3], "other_column": [4, 5, 6]}) + series = module_under_test.get_column_or_index(dataframe, "column_name") + expected = pandas.Series([1, 2, 3], name="column_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_named_index(module_under_test): + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3]}, index=pandas.Index([4, 5, 6], name="index_name") + ) + series = module_under_test.get_column_or_index(dataframe, "index_name") + expected = pandas.Series([4, 5, 6], name="index_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_datetimeindex(module_under_test): + datetimes = [ + datetime.datetime(2000, 1, 2, 3, 4, 5, 101), + datetime.datetime(2006, 7, 8, 9, 10, 11, 202), + datetime.datetime(2012, 1, 14, 15, 16, 17, 303), + ] + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3]}, + index=pandas.DatetimeIndex(datetimes, name="index_name"), + ) + series = module_under_test.get_column_or_index(dataframe, "index_name") + expected = pandas.Series(datetimes, name="index_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_multiindex(module_under_test): + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3, 4, 5, 6]}, + index=pandas.MultiIndex.from_tuples( + [("a", 0), ("a", 1), ("b", 0), ("b", 1), ("c", 0), ("c", 1)], + names=["letters", "numbers"], + ), + ) + + series = module_under_test.get_column_or_index(dataframe, "letters") + expected = pandas.Series(["a", "a", "b", "b", "c", "c"], name="letters") + pandas.testing.assert_series_equal(series, expected) + + series = module_under_test.get_column_or_index(dataframe, "numbers") + expected = pandas.Series([0, 1, 0, 1, 0, 1], name="numbers") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_without_named_index(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame(df_data) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_with_named_index_same_as_column_name( + module_under_test +): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame( + df_data, + # Use same name as an integer column but a different datatype so that + # we can verify that the column is listed but the index isn't. + index=pandas.Index([0.1, 0.2, 0.3, 0.4], name="a_series"), + ) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_with_named_index(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame( + df_data, index=pandas.Index([4, 5, 6, 7], name="a_index") + ) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_index", pandas.api.types.pandas_dtype("int64")), + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_with_multiindex(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame( + df_data, + index=pandas.MultiIndex.from_tuples( + [(0, 0, 41), (0, 0, 42), (1, 0, 41), (1, 1, 41)], + names=[ + "a_index", + # Use same name as column, but different dtype so we can verify + # the column type is included. + "b_series", + "c_index", + ], + ), + ) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_index", pandas.api.types.pandas_dtype("int64")), + ("c_index", pandas.api.types.pandas_dtype("int64")), + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_arrow_with_multiindex(module_under_test): + bq_schema = ( + schema.SchemaField("str_index", "STRING"), + # int_index is intentionally omitted, to verify that it's okay to be + # missing indexes from the schema. + schema.SchemaField("dt_index", "DATETIME"), + schema.SchemaField("int_col", "INTEGER"), + schema.SchemaField("nullable_int_col", "INTEGER"), + schema.SchemaField("str_col", "STRING"), + ) + df_data = collections.OrderedDict( + [ + ("int_col", [1, 2, 3, 4, 5, 6]), + ("nullable_int_col", [6.0, float("nan"), 7.0, float("nan"), 8.0, 9.0]), + ("str_col", ["apple", "banana", "cherry", "durian", "etrog", "fig"]), + ] + ) + df_index = pandas.MultiIndex.from_tuples( + [ + ("a", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)), + ("a", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)), + ("a", 1, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)), + ("b", 1, datetime.datetime(2000, 1, 1, 0, 0, 0)), + ("b", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)), + ("b", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)), + ], + names=["str_index", "int_index", "dt_index"], + ) + dataframe = pandas.DataFrame(df_data, index=df_index) + + arrow_table = module_under_test.dataframe_to_arrow(dataframe, bq_schema) + + assert arrow_table.schema.names == [ + "str_index", + "dt_index", + "int_col", + "nullable_int_col", + "str_col", + ] + arrow_data = arrow_table.to_pydict() + assert arrow_data["str_index"] == ["a", "a", "a", "b", "b", "b"] + expected_dt_index = [ + pandas.Timestamp(dt) + for dt in ( + datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), + datetime.datetime(2000, 1, 1, 0, 0, 0), + datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), + datetime.datetime(2000, 1, 1, 0, 0, 0), + datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), + datetime.datetime(2000, 1, 1, 0, 0, 0), + ) + ] + assert arrow_data["dt_index"] == expected_dt_index + assert arrow_data["int_col"] == [1, 2, 3, 4, 5, 6] + assert arrow_data["nullable_int_col"] == [6, None, 7, None, 8, 9] + assert arrow_data["str_col"] == [ + "apple", + "banana", + "cherry", + "durian", + "etrog", + "fig", + ] + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_dataframe_to_arrow_w_required_fields(module_under_test): +def test_dataframe_to_arrow_with_required_fields(module_under_test): bq_schema = ( schema.SchemaField("field01", "STRING", mode="REQUIRED"), schema.SchemaField("field02", "BYTES", mode="REQUIRED"), @@ -568,7 +824,7 @@ def test_dataframe_to_arrow_w_required_fields(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_dataframe_to_arrow_w_unknown_type(module_under_test): +def test_dataframe_to_arrow_with_unknown_type(module_under_test): bq_schema = ( schema.SchemaField("field00", "UNKNOWN_TYPE"), schema.SchemaField("field01", "STRING"),