diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index dacaa8074f6a..8ba7fee892e5 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -13,7 +13,6 @@ # limitations under the License. import itertools -import json import logging import time import unittest @@ -2271,26 +2270,26 @@ def test_to_dataframe_w_bqstorage_logs_session(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_empty_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + arrow_fields = [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.utf8()), + ] + arrow_schema = pyarrow.schema(arrow_fields) + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) session = bigquery_storage_v1beta1.types.ReadSession( - streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}] - ) - session.avro_schema.schema = json.dumps( - { - "fields": [ - {"name": "colA"}, - # Not alphabetical to test column order. - {"name": "colC"}, - {"name": "colB"}, - ] - } + streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}], + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) bqstorage_client.create_read_session.return_value = session @@ -2327,11 +2326,20 @@ def test_to_dataframe_w_bqstorage_empty_streams(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_nonempty(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + arrow_fields = [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.utf8()), + ] + arrow_schema = pyarrow.schema(arrow_fields) + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -2340,16 +2348,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self): {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) - session.avro_schema.schema = json.dumps( - { - "fields": [ - {"name": "colA"}, - # Not alphabetical to test column order. - {"name": "colC"}, - {"name": "colB"}, - ] - } + session = bigquery_storage_v1beta1.types.ReadSession( + streams=streams, + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) bqstorage_client.create_read_session.return_value = session @@ -2400,17 +2401,23 @@ def test_to_dataframe_w_bqstorage_nonempty(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + arrow_fields = [pyarrow.field("colA", pyarrow.int64())] + arrow_schema = pyarrow.schema(arrow_fields) + streams = [ {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) - session.avro_schema.schema = json.dumps({"fields": [{"name": "colA"}]}) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=streams, + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, + ) bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient @@ -2448,6 +2455,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @mock.patch("tqdm.tqdm") def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): from google.cloud.bigquery import schema @@ -2457,6 +2465,9 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): # Speed up testing. mut._PROGRESS_INTERVAL = 0.01 + arrow_fields = [pyarrow.field("testcol", pyarrow.int64())] + arrow_schema = pyarrow.schema(arrow_fields) + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -2466,8 +2477,10 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) - session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]}) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=streams, + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, + ) bqstorage_client.create_read_session.return_value = session mock_rowstream = mock.create_autospec(reader.ReadRowsStream) @@ -2521,6 +2534,7 @@ def blocking_to_dataframe(*args, **kwargs): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut @@ -2529,6 +2543,14 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): # Speed up testing. mut._PROGRESS_INTERVAL = 0.01 + arrow_fields = [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.utf8()), + ] + arrow_schema = pyarrow.schema(arrow_fields) + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -2539,10 +2561,8 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): # ends early. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, - ] - ) - session.avro_schema.schema = json.dumps( - {"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]} + ], + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) bqstorage_client.create_read_session.return_value = session