From 14e6baa8eab7c6ff40fc6eab8f5578c22ce75da5 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 22 Aug 2019 17:04:07 -0700 Subject: [PATCH 1/9] Allow specifying index data type in partial schema to `load_table_from_dataframe`. If an index (or level of a multi-index) has a name and is present in the schema passed to `load_table_from_dataframe`, then that index will be serialized and written to the table. Otherwise, the index is omitted from the serialized table. --- bigquery/docs/snippets.py | 47 -------------- bigquery/google/cloud/bigquery/__init__.py | 2 + .../google/cloud/bigquery/_pandas_helpers.py | 48 +++++++++++++- bigquery/samples/load_table_dataframe.py | 63 +++++++++++++++++++ .../tests/test_load_table_dataframe.py | 28 +++++++++ 5 files changed, 138 insertions(+), 50 deletions(-) create mode 100644 bigquery/samples/load_table_dataframe.py create mode 100644 bigquery/samples/tests/test_load_table_dataframe.py diff --git a/bigquery/docs/snippets.py b/bigquery/docs/snippets.py index 9b4218286402..51b9d9c3fc1c 100644 --- a/bigquery/docs/snippets.py +++ b/bigquery/docs/snippets.py @@ -2741,52 +2741,5 @@ def test_list_rows_as_dataframe(client): assert len(df) == table.num_rows # verify the number of rows -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.parametrize("parquet_engine", ["pyarrow", "fastparquet"]) -def test_load_table_from_dataframe(client, to_delete, parquet_engine): - if parquet_engine == "pyarrow" and pyarrow is None: - pytest.skip("Requires `pyarrow`") - if parquet_engine == "fastparquet" and fastparquet is None: - pytest.skip("Requires `fastparquet`") - - pandas.set_option("io.parquet.engine", parquet_engine) - - dataset_id = "load_table_from_dataframe_{}".format(_millis()) - dataset = bigquery.Dataset(client.dataset(dataset_id)) - client.create_dataset(dataset) - to_delete.append(dataset) - - # [START bigquery_load_table_dataframe] - # from google.cloud import bigquery - # import pandas - # client = bigquery.Client() - # dataset_id = 'my_dataset' - - dataset_ref = client.dataset(dataset_id) - table_ref = dataset_ref.table("monty_python") - records = [ - {"title": u"The Meaning of Life", "release_year": 1983}, - {"title": u"Monty Python and the Holy Grail", "release_year": 1975}, - {"title": u"Life of Brian", "release_year": 1979}, - {"title": u"And Now for Something Completely Different", "release_year": 1971}, - ] - # Optionally set explicit indices. - # If indices are not specified, a column will be created for the default - # indices created by pandas. - index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"] - dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id")) - - job = client.load_table_from_dataframe(dataframe, table_ref, location="US") - - job.result() # Waits for table load to complete. - - assert job.state == "DONE" - table = client.get_table(table_ref) - assert table.num_rows == 4 - # [END bigquery_load_table_dataframe] - column_names = [field.name for field in table.schema] - assert sorted(column_names) == ["release_year", "title", "wikidata_id"] - - if __name__ == "__main__": pytest.main() diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index c41ceb6b0306..bda8c5611435 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -36,6 +36,7 @@ from google.cloud.bigquery.dataset import AccessEntry from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetReference +from google.cloud.bigquery import enums from google.cloud.bigquery.enums import StandardSqlDataTypes from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.external_config import BigtableOptions @@ -124,6 +125,7 @@ "GoogleSheetsOptions", "DEFAULT_RETRY", # Enum Constants + "enums", "Compression", "CreateDisposition", "DestinationFormat", diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 2d2fb8af24d3..c2845f46357f 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -187,6 +187,28 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) +def _columns_and_indexes(dataframe): + """Return all index and column names with dtypes. + + Returns: + Sequence[Tuple[dtype, str]]: + Returns a sorted list of indexes and column names with + corresponding dtypes. + """ + columns_and_indexes = [] + if isinstance(dataframe.index, pandas.MultiIndex): + for name in dataframe.index.names: + if name: + values = dataframe.index.get_level_values(name) + columns_and_indexes.append((name, values.dtype)) + else: + if dataframe.index.name: + columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) + + columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) + return columns_and_indexes + + def dataframe_to_bq_schema(dataframe, bq_schema): """Convert a pandas DataFrame schema to a BigQuery schema. @@ -217,7 +239,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): bq_schema_unused = set() bq_schema_out = [] - for column, dtype in zip(dataframe.columns, dataframe.dtypes): + for column, dtype in _columns_and_indexes(dataframe): # Use provided type from schema, if present. bq_field = bq_schema_index.get(column) if bq_field: @@ -245,6 +267,21 @@ def dataframe_to_bq_schema(dataframe, bq_schema): return tuple(bq_schema_out) +def _column_or_index(dataframe, name): + """Return a column or index as a pandas series.""" + if name in dataframe.columns: + return dataframe[name] + + if isinstance(dataframe.index, pandas.MultiIndex): + if name in dataframe.index.names: + return dataframe.index.get_level_values(name) + else: + if name == dataframe.index.name: + return dataframe.index.to_series() + + raise ValueError("column or index '{}' not found.".format(name)) + + def dataframe_to_arrow(dataframe, bq_schema): """Convert pandas dataframe to Arrow table, using BigQuery schema. @@ -261,9 +298,10 @@ def dataframe_to_arrow(dataframe, bq_schema): BigQuery schema. """ column_names = set(dataframe.columns) + column_and_index_names = set(name for name, _ in _columns_and_indexes(dataframe)) bq_field_names = set(field.name for field in bq_schema) - extra_fields = bq_field_names - column_names + extra_fields = bq_field_names - column_and_index_names if extra_fields: raise ValueError( "bq_schema contains fields not present in dataframe: {}".format( @@ -271,6 +309,8 @@ def dataframe_to_arrow(dataframe, bq_schema): ) ) + # It's okay for indexes to be missing from bq_schema, but it's not okay to + # be missing columns. missing_fields = column_names - bq_field_names if missing_fields: raise ValueError( @@ -283,7 +323,9 @@ def dataframe_to_arrow(dataframe, bq_schema): for bq_field in bq_schema: arrow_fields.append(bq_to_arrow_field(bq_field)) arrow_names.append(bq_field.name) - arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field)) + arrow_arrays.append( + bq_to_arrow_array(_column_or_index(dataframe, bq_field.name), bq_field) + ) if all((field is not None for field in arrow_fields)): return pyarrow.Table.from_arrays( diff --git a/bigquery/samples/load_table_dataframe.py b/bigquery/samples/load_table_dataframe.py new file mode 100644 index 000000000000..72dcc185b9dc --- /dev/null +++ b/bigquery/samples/load_table_dataframe.py @@ -0,0 +1,63 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def load_table_dataframe(client, table_id): + # [START bigquery_load_table_dataframe] + from google.cloud import bigquery + import pandas + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + # TODO(developer): Set table_id to the ID of the table to create. + # table_id = "your-project.your_dataset.your_table_name" + + records = [ + {"title": u"The Meaning of Life", "release_year": 1983}, + {"title": u"Monty Python and the Holy Grail", "release_year": 1975}, + {"title": u"Life of Brian", "release_year": 1979}, + {"title": u"And Now for Something Completely Different", "release_year": 1971}, + ] + # Optionally set explicit indices. + index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"] + dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id")) + job_config = bigquery.LoadJobConfig( + # Specify a (partial) schema. All columns are always written to the + # table. The schema is used to assist in data type definitions. + schema=[ + # Specify the type of columns whose type cannot be auto-detected. For + # example the "title" column uses pandas dtype "object", so its + # data type is ambiguous. + bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING), + # Indexes are written if included in the schema by name. + bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING), + ] + ) + + job = client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config, location="US" + ) + job.result() # Waits for table load to complete. + + table = client.get_table(table_id) + print("Wrote {} rows to {}".format(table.num_rows, table_id)) + # [END bigquery_load_table_dataframe] + + +if __name__ == "__main__": + import sys + from google.cloud import bigquery + + load_table_dataframe(bigquery.Client(), sys.argv[1]) diff --git a/bigquery/samples/tests/test_load_table_dataframe.py b/bigquery/samples/tests/test_load_table_dataframe.py new file mode 100644 index 000000000000..d5fff8b48df7 --- /dev/null +++ b/bigquery/samples/tests/test_load_table_dataframe.py @@ -0,0 +1,28 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from .. import load_table_dataframe + + +pytest.importorskip("pandas") +pytest.importorskip("pyarrow") + + +def test_load_table_dataframe(client, random_table_id): + load_table_dataframe.load_table_dataframe(client, random_table_id) + + column_names = [field.name for field in table.schema] + assert sorted(column_names) == ["release_year", "title", "wikidata_id"] From 011a0d7f7ff7ff7f6151b0cf4d0430f8f3495bc3 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 26 Aug 2019 16:13:16 -0700 Subject: [PATCH 2/9] Add unit tests for get_column_or_index and list_columns_and_indexes --- .../google/cloud/bigquery/_pandas_helpers.py | 51 +++--- bigquery/tests/unit/test__pandas_helpers.py | 173 +++++++++++++++++- 2 files changed, 201 insertions(+), 23 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index c2845f46357f..b082befddb81 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -187,24 +187,46 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) -def _columns_and_indexes(dataframe): +def get_column_or_index(dataframe, name): + """Return a column or index as a pandas series.""" + if name in dataframe.columns: + return dataframe[name].reset_index(drop=True) + + if isinstance(dataframe.index, pandas.MultiIndex): + if name in dataframe.index.names: + return ( + dataframe.index.get_level_values(name) + .to_series() + .reset_index(drop=True) + ) + else: + if name == dataframe.index.name: + return dataframe.index.to_series().reset_index(drop=True) + + raise ValueError("column or index '{}' not found.".format(name)) + + +def list_columns_and_indexes(dataframe): """Return all index and column names with dtypes. Returns: Sequence[Tuple[dtype, str]]: Returns a sorted list of indexes and column names with - corresponding dtypes. + corresponding dtypes. If an index is missing a name or has the + same name as a column, the index is omitted. """ + column_names = frozenset(dataframe.columns) columns_and_indexes = [] if isinstance(dataframe.index, pandas.MultiIndex): for name in dataframe.index.names: - if name: + if name and name not in column_names: values = dataframe.index.get_level_values(name) columns_and_indexes.append((name, values.dtype)) else: if dataframe.index.name: columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) + # Add columns last so that if you iterate over the list, the column values overwrite any indexes with the same name. columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) return columns_and_indexes @@ -239,7 +261,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): bq_schema_unused = set() bq_schema_out = [] - for column, dtype in _columns_and_indexes(dataframe): + for column, dtype in list_columns_and_indexes(dataframe): # Use provided type from schema, if present. bq_field = bq_schema_index.get(column) if bq_field: @@ -267,21 +289,6 @@ def dataframe_to_bq_schema(dataframe, bq_schema): return tuple(bq_schema_out) -def _column_or_index(dataframe, name): - """Return a column or index as a pandas series.""" - if name in dataframe.columns: - return dataframe[name] - - if isinstance(dataframe.index, pandas.MultiIndex): - if name in dataframe.index.names: - return dataframe.index.get_level_values(name) - else: - if name == dataframe.index.name: - return dataframe.index.to_series() - - raise ValueError("column or index '{}' not found.".format(name)) - - def dataframe_to_arrow(dataframe, bq_schema): """Convert pandas dataframe to Arrow table, using BigQuery schema. @@ -298,7 +305,9 @@ def dataframe_to_arrow(dataframe, bq_schema): BigQuery schema. """ column_names = set(dataframe.columns) - column_and_index_names = set(name for name, _ in _columns_and_indexes(dataframe)) + column_and_index_names = set( + name for name, _ in list_columns_and_indexes(dataframe) + ) bq_field_names = set(field.name for field in bq_schema) extra_fields = bq_field_names - column_and_index_names @@ -324,7 +333,7 @@ def dataframe_to_arrow(dataframe, bq_schema): arrow_fields.append(bq_to_arrow_field(bq_field)) arrow_names.append(bq_field.name) arrow_arrays.append( - bq_to_arrow_array(_column_or_index(dataframe, bq_field.name), bq_field) + bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) ) if all((field is not None for field in arrow_fields)): diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index facfb79b3ccb..c4e3f2ca3569 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import datetime import decimal import functools @@ -21,6 +22,8 @@ try: import pandas + import pandas.api.types + import pandas.testing except ImportError: # pragma: NO COVER pandas = None try: @@ -511,9 +514,175 @@ def test_bq_to_arrow_schema_w_unknown_type(module_under_test): assert actual is None +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_not_found(module_under_test): + dataframe = pandas.DataFrame( + {"not_the_column_youre_looking_for": [1, 2, 3]}, + ) + with pytest.raises(ValueError, match="col_is_missing"): + module_under_test.get_column_or_index(dataframe, "col_is_missing") + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_both_prefers_column(module_under_test): + dataframe = pandas.DataFrame( + {"some_name": [1, 2, 3]}, index=pandas.Index([0, 1, 2], name="some_name") + ) + series = module_under_test.get_column_or_index(dataframe, "some_name") + expected = pandas.Series([1, 2, 3], name="some_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_column(module_under_test): + dataframe = pandas.DataFrame({"column_name": [1, 2, 3], "other_column": [4, 5, 6]}) + series = module_under_test.get_column_or_index(dataframe, "column_name") + expected = pandas.Series([1, 2, 3], name="column_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_index(module_under_test): + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3]}, index=pandas.Index([4, 5, 6], name="index_name") + ) + series = module_under_test.get_column_or_index(dataframe, "index_name") + expected = pandas.Series([4, 5, 6], name="index_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_datetimeindex(module_under_test): + datetimes = [ + datetime.datetime(2000, 1, 2, 3, 4, 5, 101), + datetime.datetime(2006, 7, 8, 9, 10, 11, 202), + datetime.datetime(2012, 1, 14, 15, 16, 17, 303), + ] + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3]}, + index=pandas.DatetimeIndex(datetimes, name="index_name"), + ) + series = module_under_test.get_column_or_index(dataframe, "index_name") + expected = pandas.Series(datetimes, name="index_name") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_multiindex(module_under_test): + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3, 4, 5, 6]}, + index=pandas.MultiIndex.from_tuples( + [("a", 0), ("a", 1), ("b", 0), ("b", 1), ("c", 0), ("c", 1)], + names=["letters", "numbers"], + ), + ) + + series = module_under_test.get_column_or_index(dataframe, "letters") + expected = pandas.Series(["a", "a", "b", "b", "c", "c"], name="letters") + pandas.testing.assert_series_equal(series, expected) + + series = module_under_test.get_column_or_index(dataframe, "numbers") + expected = pandas.Series([0, 1, 0, 1, 0, 1], name="numbers") + pandas.testing.assert_series_equal(series, expected) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_without_named_index(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame(df_data) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_with_index(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame( + df_data, index=pandas.Index([4, 5, 6, 7], name="a_index") + ) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_index", pandas.api.types.pandas_dtype("int64")), + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_with_multiindex(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame( + df_data, + index=pandas.MultiIndex.from_tuples( + [(0, 0, 41), (0, 0, 42), (1, 0, 41), (1, 1, 41)], + names=[ + "a_index", + # Use same name as column, but different dtype so we can verify + # the column type is included. + "b_series", + "c_index", + ], + ), + ) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_index", pandas.api.types.pandas_dtype("int64")), + ("c_index", pandas.api.types.pandas_dtype("int64")), + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_arrow_with_multiindex(module_under_test): + bq_schema = ( + schema.SchemaField("int_index", "INTEGER"), + schema.SchemaField("str_col", "STRING"), + schema.SchemaField("int_col", "INTEGER"), + schema.SchemaField("nullable_int_col", "INTEGER"), + schema.SchemaField("str_index", "STRING"), + ) + df_data = collections.OrderedDict + dataframe = pandas.DataFrame( + + ) + assert False + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_dataframe_to_arrow_w_required_fields(module_under_test): +def test_dataframe_to_arrow_with_required_fields(module_under_test): bq_schema = ( schema.SchemaField("field01", "STRING", mode="REQUIRED"), schema.SchemaField("field02", "BYTES", mode="REQUIRED"), @@ -568,7 +737,7 @@ def test_dataframe_to_arrow_w_required_fields(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -def test_dataframe_to_arrow_w_unknown_type(module_under_test): +def test_dataframe_to_arrow_with_unknown_type(module_under_test): bq_schema = ( schema.SchemaField("field00", "UNKNOWN_TYPE"), schema.SchemaField("field01", "STRING"), From 6a3fd3babcc680401db1afffdeddf4e1ec5b9aaa Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 27 Aug 2019 10:09:39 -0700 Subject: [PATCH 3/9] Add unit test for dataframe_to_arrow with indexes. --- bigquery/tests/unit/test__pandas_helpers.py | 67 ++++++++++++++++++--- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index c4e3f2ca3569..b371ac03fe6d 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -516,9 +516,7 @@ def test_bq_to_arrow_schema_w_unknown_type(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_get_column_or_index_not_found(module_under_test): - dataframe = pandas.DataFrame( - {"not_the_column_youre_looking_for": [1, 2, 3]}, - ) + dataframe = pandas.DataFrame({"not_the_column_youre_looking_for": [1, 2, 3]}) with pytest.raises(ValueError, match="col_is_missing"): module_under_test.get_column_or_index(dataframe, "col_is_missing") @@ -667,17 +665,66 @@ def test_list_columns_and_indexes_with_multiindex(module_under_test): @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") def test_dataframe_to_arrow_with_multiindex(module_under_test): bq_schema = ( - schema.SchemaField("int_index", "INTEGER"), - schema.SchemaField("str_col", "STRING"), + schema.SchemaField("str_index", "STRING"), + # int_index is intentionally omitted, to verify that it's okay to be + # missing indexes from the schema. + schema.SchemaField("dt_index", "DATETIME"), schema.SchemaField("int_col", "INTEGER"), schema.SchemaField("nullable_int_col", "INTEGER"), - schema.SchemaField("str_index", "STRING"), + schema.SchemaField("str_col", "STRING"), ) - df_data = collections.OrderedDict - dataframe = pandas.DataFrame( - + df_data = collections.OrderedDict( + [ + ("int_col", [1, 2, 3, 4, 5, 6]), + ("nullable_int_col", [6.0, float("nan"), 7.0, float("nan"), 8.0, 9.0]), + ("str_col", ["apple", "banana", "cherry", "durian", "etrog", "fig"]), + ] + ) + df_index = pandas.MultiIndex.from_tuples( + [ + ("a", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)), + ("a", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)), + ("a", 1, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)), + ("b", 1, datetime.datetime(2000, 1, 1, 0, 0, 0)), + ("b", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)), + ("b", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)), + ], + names=["str_index", "int_index", "dt_index"], ) - assert False + dataframe = pandas.DataFrame(df_data, index=df_index) + + arrow_table = module_under_test.dataframe_to_arrow(dataframe, bq_schema) + + assert arrow_table.schema.names == [ + "str_index", + "dt_index", + "int_col", + "nullable_int_col", + "str_col", + ] + arrow_data = arrow_table.to_pydict() + assert arrow_data["str_index"] == ["a", "a", "a", "b", "b", "b"] + assert arrow_data["dt_index"] == [ + pandas.Timestamp(dt) + for dt in ( + datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), + datetime.datetime(2000, 1, 1, 0, 0, 0), + datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), + datetime.datetime(2000, 1, 1, 0, 0, 0), + datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), + datetime.datetime(2000, 1, 1, 0, 0, 0), + ) + ] + assert arrow_data["int_col"] == [1, 2, 3, 4, 5, 6] + assert arrow_data["nullable_int_col"] == [6, None, 7, None, 8, 9] + assert arrow_data["str_col"] == [ + "apple", + "banana", + "cherry", + "durian", + "etrog", + "fig", + ] @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") From 9b26faf025190d5260d71e07132d478dacb43c2a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 27 Aug 2019 10:50:35 -0700 Subject: [PATCH 4/9] Update tests for load_table_dataframe sample. --- bigquery/samples/load_table_dataframe.py | 34 ++++++++++++------- .../tests/test_load_table_dataframe.py | 8 +++-- bigquery/tests/unit/test__pandas_helpers.py | 15 +++++++- 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/bigquery/samples/load_table_dataframe.py b/bigquery/samples/load_table_dataframe.py index 72dcc185b9dc..69eeb6ef89d0 100644 --- a/bigquery/samples/load_table_dataframe.py +++ b/bigquery/samples/load_table_dataframe.py @@ -30,9 +30,17 @@ def load_table_dataframe(client, table_id): {"title": u"Life of Brian", "release_year": 1979}, {"title": u"And Now for Something Completely Different", "release_year": 1971}, ] - # Optionally set explicit indices. - index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"] - dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id")) + dataframe = pandas.DataFrame( + records, + # In the loaded table, the column order reflects the order of the + # columns in the DataFrame. + columns=["title", "release_year"], + # Optionally, set a named index, which can also be written to the + # BigQuery table. + index=pandas.Index( + [u"Q24980", u"Q25043", u"Q24953", u"Q16403"], name="wikidata_id" + ), + ) job_config = bigquery.LoadJobConfig( # Specify a (partial) schema. All columns are always written to the # table. The schema is used to assist in data type definitions. @@ -43,7 +51,11 @@ def load_table_dataframe(client, table_id): bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING), # Indexes are written if included in the schema by name. bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING), - ] + ], + # Optionally, set the write disposition. BigQuery appends loaded rows + # to an existing table by default, but with WRITE_TRUNCATE write + # disposition it replaces the table with the loaded data. + write_disposition="WRITE_TRUNCATE", ) job = client.load_table_from_dataframe( @@ -52,12 +64,10 @@ def load_table_dataframe(client, table_id): job.result() # Waits for table load to complete. table = client.get_table(table_id) - print("Wrote {} rows to {}".format(table.num_rows, table_id)) + print( + "Loaded {} rows and {} columns to {}".format( + table.num_rows, len(table.schema), table_id + ) + ) # [END bigquery_load_table_dataframe] - - -if __name__ == "__main__": - import sys - from google.cloud import bigquery - - load_table_dataframe(bigquery.Client(), sys.argv[1]) + return table diff --git a/bigquery/samples/tests/test_load_table_dataframe.py b/bigquery/samples/tests/test_load_table_dataframe.py index d5fff8b48df7..d553d449a525 100644 --- a/bigquery/samples/tests/test_load_table_dataframe.py +++ b/bigquery/samples/tests/test_load_table_dataframe.py @@ -21,8 +21,10 @@ pytest.importorskip("pyarrow") -def test_load_table_dataframe(client, random_table_id): - load_table_dataframe.load_table_dataframe(client, random_table_id) +def test_load_table_dataframe(capsys, client, random_table_id): + table = load_table_dataframe.load_table_dataframe(client, random_table_id) + out, _ = capsys.readouterr() + assert "Loaded 4 rows and 3 columns" in out column_names = [field.name for field in table.schema] - assert sorted(column_names) == ["release_year", "title", "wikidata_id"] + assert column_names == ["wikidata_id", "title", "release_year"] diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index b371ac03fe6d..514b37666cdb 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -521,6 +521,18 @@ def test_get_column_or_index_not_found(module_under_test): module_under_test.get_column_or_index(dataframe, "col_is_missing") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_get_column_or_index_with_multiindex_not_found(module_under_test): + dataframe = pandas.DataFrame( + {"column_name": [1, 2, 3, 4, 5, 6]}, + index=pandas.MultiIndex.from_tuples( + [("a", 0), ("a", 1), ("b", 0), ("b", 1), ("c", 0), ("c", 1)] + ), + ) + with pytest.raises(ValueError, match="not_in_df"): + module_under_test.get_column_or_index(dataframe, "not_in_df") + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_get_column_or_index_with_both_prefers_column(module_under_test): dataframe = pandas.DataFrame( @@ -704,7 +716,7 @@ def test_dataframe_to_arrow_with_multiindex(module_under_test): ] arrow_data = arrow_table.to_pydict() assert arrow_data["str_index"] == ["a", "a", "a", "b", "b", "b"] - assert arrow_data["dt_index"] == [ + expected_dt_index = [ pandas.Timestamp(dt) for dt in ( datetime.datetime(1999, 12, 31, 23, 59, 59, 999999), @@ -715,6 +727,7 @@ def test_dataframe_to_arrow_with_multiindex(module_under_test): datetime.datetime(2000, 1, 1, 0, 0, 0), ) ] + assert arrow_data["dt_index"] == expected_dt_index assert arrow_data["int_col"] == [1, 2, 3, 4, 5, 6] assert arrow_data["nullable_int_col"] == [6, None, 7, None, 8, 9] assert arrow_data["str_col"] == [ From 4e90e3ee1d5f97293226e32ef5896cc5d5b78143 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 27 Aug 2019 10:56:55 -0700 Subject: [PATCH 5/9] Update reference to moved load_table_dataframe sample. --- bigquery/docs/usage/pandas.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/docs/usage/pandas.rst b/bigquery/docs/usage/pandas.rst index 9504bd19673a..9db98dfbbccb 100644 --- a/bigquery/docs/usage/pandas.rst +++ b/bigquery/docs/usage/pandas.rst @@ -55,7 +55,7 @@ install the BigQuery python client library with :mod:`pandas` and The following example demonstrates how to create a :class:`pandas.DataFrame` and load it into a new table: -.. literalinclude:: ../snippets.py +.. literalinclude:: ../samples/load_table_dataframe.py :language: python :dedent: 4 :start-after: [START bigquery_load_table_dataframe] From 349a43946a1f88a73629a1813fbfbfbc025575f5 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Aug 2019 08:48:26 -0700 Subject: [PATCH 6/9] Use unicode strings for ValueErrors. --- bigquery/google/cloud/bigquery/_pandas_helpers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index b082befddb81..4be27beda505 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -273,7 +273,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # pandas dtype. bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: - warnings.warn("Unable to determine type of column '{}'.".format(column)) + warnings.warn(u"Unable to determine type of column '{}'.".format(column)) return None bq_field = schema.SchemaField(column, bq_type) bq_schema_out.append(bq_field) @@ -282,7 +282,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): # column, but it was not found. if bq_schema_unused: raise ValueError( - "bq_schema contains fields not present in dataframe: {}".format( + u"bq_schema contains fields not present in dataframe: {}".format( bq_schema_unused ) ) @@ -313,7 +313,7 @@ def dataframe_to_arrow(dataframe, bq_schema): extra_fields = bq_field_names - column_and_index_names if extra_fields: raise ValueError( - "bq_schema contains fields not present in dataframe: {}".format( + u"bq_schema contains fields not present in dataframe: {}".format( extra_fields ) ) @@ -323,7 +323,7 @@ def dataframe_to_arrow(dataframe, bq_schema): missing_fields = column_names - bq_field_names if missing_fields: raise ValueError( - "bq_schema is missing fields from dataframe: {}".format(missing_fields) + u"bq_schema is missing fields from dataframe: {}".format(missing_fields) ) arrow_arrays = [] From 1d8b89b30430445af71063caae3bbdd0c2be13d6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Aug 2019 08:54:33 -0700 Subject: [PATCH 7/9] Don't include index if has same name as column name. --- .../google/cloud/bigquery/_pandas_helpers.py | 2 +- bigquery/tests/unit/test__pandas_helpers.py | 29 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 4be27beda505..ed539d22a37e 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -223,7 +223,7 @@ def list_columns_and_indexes(dataframe): values = dataframe.index.get_level_values(name) columns_and_indexes.append((name, values.dtype)) else: - if dataframe.index.name: + if dataframe.index.name and dataframe.index.name not in column_names: columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) # Add columns last so that if you iterate over the list, the column values overwrite any indexes with the same name. diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 514b37666cdb..c1c85db20aa3 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -552,7 +552,7 @@ def test_get_column_or_index_with_column(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_get_column_or_index_with_index(module_under_test): +def test_get_column_or_index_with_named_index(module_under_test): dataframe = pandas.DataFrame( {"column_name": [1, 2, 3]}, index=pandas.Index([4, 5, 6], name="index_name") ) @@ -617,7 +617,32 @@ def test_list_columns_and_indexes_without_named_index(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_list_columns_and_indexes_with_index(module_under_test): +def test_list_columns_and_indexes_with_named_index_same_as_column_name(module_under_test): + df_data = collections.OrderedDict( + [ + ("a_series", [1, 2, 3, 4]), + ("b_series", [0.1, 0.2, 0.3, 0.4]), + ("c_series", ["a", "b", "c", "d"]), + ] + ) + dataframe = pandas.DataFrame( + df_data, + # Use same name as an integer column but a different datatype so that + # we can verify that the column is listed but the index isn't. + index=pandas.Index([0.1, 0.2, 0.3, 0.4], name="a_series") + ) + + columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe) + expected = [ + ("a_series", pandas.api.types.pandas_dtype("int64")), + ("b_series", pandas.api.types.pandas_dtype("float64")), + ("c_series", pandas.api.types.pandas_dtype("object")), + ] + assert columns_and_indexes == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_list_columns_and_indexes_with_named_index(module_under_test): df_data = collections.OrderedDict( [ ("a_series", [1, 2, 3, 4]), From 99983d53c85a2edcecaf469a678a66352edbc652 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Aug 2019 08:56:08 -0700 Subject: [PATCH 8/9] Remove incorrect comment about column/index listing. --- bigquery/google/cloud/bigquery/_pandas_helpers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index ed539d22a37e..5e73c9f58e22 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -226,7 +226,6 @@ def list_columns_and_indexes(dataframe): if dataframe.index.name and dataframe.index.name not in column_names: columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) - # Add columns last so that if you iterate over the list, the column values overwrite any indexes with the same name. columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) return columns_and_indexes From e5a0b19a68c365c1158beb40aa151cd4e9680916 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Aug 2019 11:21:29 -0700 Subject: [PATCH 9/9] Blacken --- bigquery/tests/unit/test__pandas_helpers.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index c1c85db20aa3..b539abe9a89a 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -617,7 +617,9 @@ def test_list_columns_and_indexes_without_named_index(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_list_columns_and_indexes_with_named_index_same_as_column_name(module_under_test): +def test_list_columns_and_indexes_with_named_index_same_as_column_name( + module_under_test +): df_data = collections.OrderedDict( [ ("a_series", [1, 2, 3, 4]), @@ -629,7 +631,7 @@ def test_list_columns_and_indexes_with_named_index_same_as_column_name(module_un df_data, # Use same name as an integer column but a different datatype so that # we can verify that the column is listed but the index isn't. - index=pandas.Index([0.1, 0.2, 0.3, 0.4], name="a_series") + index=pandas.Index([0.1, 0.2, 0.3, 0.4], name="a_series"), ) columns_and_indexes = module_under_test.list_columns_and_indexes(dataframe)