From 4dd9ee9c543bf031dc3847869afebcda6decef96 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 3 Jul 2019 16:31:00 -0700 Subject: [PATCH 1/9] Add `to_arrow` to get a `pyarrow.Table` from query results. An Arrow `Table` supports a richer set of types than a pandas `DataFrame`, and is the basis of many data analysis systems. It can be used in conjunction with pandas through the `Table.to_pandas()` method or the pandas extension types provided by the `fletcher` package. --- bigquery/google/cloud/bigquery/_helpers.py | 15 +-- .../google/cloud/bigquery/_pandas_helpers.py | 61 +++++++++-- bigquery/google/cloud/bigquery/job.py | 40 +++++++ bigquery/google/cloud/bigquery/table.py | 101 ++++++++++++++++++ bigquery/samples/query_to_arrow.py | 58 ++++++++++ bigquery/samples/tests/test_query_to_arrow.py | 29 +++++ bigquery/tests/unit/test_table.py | 26 +++-- 7 files changed, 307 insertions(+), 23 deletions(-) create mode 100644 bigquery/samples/query_to_arrow.py create mode 100644 bigquery/samples/tests/test_query_to_arrow.py diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 90b1f14016b7..5bd92aea951f 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -197,6 +197,14 @@ def _field_to_index_mapping(schema): return {f.name: i for i, f in enumerate(schema)} +def _field_from_json(resource, field): + converter = _CELLDATA_FROM_JSON[field.field_type] + if field.mode == "REPEATED": + return [converter(item["v"], field) for item in resource] + else: + return converter(resource, field) + + def _row_tuple_from_json(row, schema): """Convert JSON row data to row with appropriate types. @@ -214,12 +222,7 @@ def _row_tuple_from_json(row, schema): """ row_data = [] for field, cell in zip(schema, row["f"]): - converter = _CELLDATA_FROM_JSON[field.field_type] - if field.mode == "REPEATED": - row_data.append([converter(item["v"], field) for item in cell["v"]]) - else: - row_data.append(converter(cell["v"], field)) - + row_data.append(_field_from_json(cell["v"], field)) return tuple(row_data) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 5261c2b99efd..306e16a55ce9 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -14,7 +14,6 @@ """Shared helper functions for connecting BigQuery and pandas.""" -import collections import concurrent.futures import warnings @@ -144,6 +143,26 @@ def bq_to_arrow_field(bq_field): return None +def bq_to_arrow_schema(bq_schema): + """Return the Arrow schema, corresponding to a given BigQuery schema. + + Raises: + ValueError: + If the Arrow type of any column cannot be determined. + """ + arrow_fields = [] + for bq_field in bq_schema: + arrow_field = bq_to_arrow_field(bq_field) + if arrow_field is None: + raise ValueError( + "Could not dermine Arrow type for BigQuery field: {}.".format( + repr(bq_field) + ) + ) + arrow_fields.append(arrow_field) + return pyarrow.schema(arrow_fields) + + def bq_to_arrow_array(series, bq_field): arrow_type = bq_to_arrow_data_type(bq_field) if bq_field.mode.upper() == "REPEATED": @@ -210,13 +229,41 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath): pyarrow.parquet.write_table(arrow_table, filepath) +def _tabledata_list_page_to_arrow(page, column_names, arrow_types): + # Iterate over the page to force the API request to get the page data. + try: + next(iter(page)) + except StopIteration: + pass + + arrays = [] + for column_index, arrow_type in enumerate(arrow_types): + arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type)) + + return pyarrow.RecordBatch.from_arrays(arrays, column_names) + + +def download_arrow_tabledata_list(pages, schema): + """Use tabledata.list to construct an iterable of RecordBatches.""" + column_names = [field.name for field in schema] + arrow_types = [bq_to_arrow_data_type(field) for field in schema] + + for page in pages: + yield _tabledata_list_page_to_arrow(page, column_names, arrow_types) + + def _tabledata_list_page_to_dataframe(page, column_names, dtypes): - columns = collections.defaultdict(list) - for row in page: - for column in column_names: - columns[column].append(row[column]) - for column in dtypes: - columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) + # Iterate over the page to force the API request to get the page data. + try: + next(iter(page)) + except StopIteration: + pass + + columns = {} + for column_index, column_name in enumerate(column_names): + dtype = dtypes.get(column_name) + columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype) + return pandas.DataFrame(columns, columns=column_names) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 87dab59e339b..ca6ccefe7167 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2896,6 +2896,46 @@ def result(self, timeout=None, page_size=None, retry=DEFAULT_RETRY): rows._preserve_order = _contains_order_by(self.query) return rows + def to_arrow(self, progress_bar_type=None): + """[Beta] Create a class:`pyarrow.Table` by loading all pages of a + table or query. + + Args: + progress_bar_type (Optional[str]): + If set, use the `tqdm `_ library to + display a progress bar while the data downloads. Install the + ``tqdm`` package to use this feature. + + Possible values of ``progress_bar_type`` include: + + ``None`` + No progress bar. + ``'tqdm'`` + Use the :func:`tqdm.tqdm` function to print a progress bar + to :data:`sys.stderr`. + ``'tqdm_notebook'`` + Use the :func:`tqdm.tqdm_notebook` function to display a + progress bar as a Jupyter notebook widget. + ``'tqdm_gui'`` + Use the :func:`tqdm.tqdm_gui` function to display a + progress bar as a graphical dialog box. + + Returns: + pyarrow.Table + A :class:`pyarrow.Table` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: + If the :mod:`pyarrow` library cannot be imported. + + ..versionadded:: 1.17.0 + """ + return self.result().to_arrow( + progress_bar_type=progress_bar_type, + ) + def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Return a pandas DataFrame from a QueryJob diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7af3bc6f48b4..a1af28ec7a12 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -33,6 +33,11 @@ except ImportError: # pragma: NO COVER pandas = None +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None + try: import tqdm except ImportError: # pragma: NO COVER @@ -58,6 +63,10 @@ "The pandas library is not installed, please install " "pandas to use the to_dataframe() function." ) +_NO_PYARROW_ERROR = ( + "The pyarrow library is not installed, please install " + "pandas to use the to_arrow() function." +) _NO_TQDM_ERROR = ( "A progress bar was requested, but there was an error loading the tqdm " "library. Please install tqdm to use the progress bar functionality." @@ -1394,6 +1403,75 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None + def _to_arrow_iterable(self): + """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + for record_batch in _pandas_helpers.download_arrow_tabledata_list( + iter(self.pages), self.schema + ): + yield record_batch + + def to_arrow(self, progress_bar_type=None): + """[Beta] Create a class:`pyarrow.Table` by loading all pages of a + table or query. + + Args: + progress_bar_type (Optional[str]): + If set, use the `tqdm `_ library to + display a progress bar while the data downloads. Install the + ``tqdm`` package to use this feature. + + Possible values of ``progress_bar_type`` include: + + ``None`` + No progress bar. + ``'tqdm'`` + Use the :func:`tqdm.tqdm` function to print a progress bar + to :data:`sys.stderr`. + ``'tqdm_notebook'`` + Use the :func:`tqdm.tqdm_notebook` function to display a + progress bar as a Jupyter notebook widget. + ``'tqdm_gui'`` + Use the :func:`tqdm.tqdm_gui` function to display a + progress bar as a graphical dialog box. + + Returns: + pyarrow.Table + A :class:`pyarrow.Table` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: + If the :mod:`pyarrow` library cannot be imported. + + ..versionadded:: 1.17.0 + """ + if pyarrow is None: + raise ValueError(_NO_PYARROW_ERROR) + + progress_bar = self._get_progress_bar(progress_bar_type) + + record_batches = [] + for record_batch in self._to_arrow_iterable(): + record_batches.append(record_batch) + + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(record_batch.num_rows) + + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() + + # Avoid concatting an empty list without a schema. + if not record_batches: + arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) + return pyarrow.Table.from_batches([], schema=arrow_schema) + return pyarrow.Table.from_batches(record_batches) + def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -1734,6 +1812,25 @@ def _item_to_row(iterator, resource): ) +def _tabledata_list_page_columns(schema, response): + """Make a generator of all the columns in a page from tabledata.list. + + This enables creating a :class:`pandas.DataFrame` and other + column-oriented data structures such as :class:`pyarrow.RecordBatch` + """ + columns = [] + rows = response.get("rows", []) + + def get_column_data(field_index, field): + for row in rows: + yield _helpers._field_from_json(row["f"][field_index]["v"], field) + + for field_index, field in enumerate(schema): + columns.append(get_column_data(field_index, field)) + + return columns + + # pylint: disable=unused-argument def _rows_page_start(iterator, page, response): """Grab total rows when :class:`~google.cloud.iterator.Page` starts. @@ -1747,6 +1844,10 @@ def _rows_page_start(iterator, page, response): :type response: dict :param response: The JSON API response for a page of rows in a table. """ + # Save the raw response so that it can be processed separately. + page._response = response + page._columns = _tabledata_list_page_columns(iterator._schema, response) + total_rows = response.get("totalRows") if total_rows is not None: total_rows = int(total_rows) diff --git a/bigquery/samples/query_to_arrow.py b/bigquery/samples/query_to_arrow.py new file mode 100644 index 000000000000..e3ddc23f889a --- /dev/null +++ b/bigquery/samples/query_to_arrow.py @@ -0,0 +1,58 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def main(client): + # [START bigquery_query_to_arrow] + # TODO(developer): Import the client library. + # from google.cloud import bigquery + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + sql = """ + WITH races AS ( + SELECT "800M" AS race, + [STRUCT("Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits), + STRUCT("Makhloufi" as name, [24.5, 25.4, 26.6, 26.1] as splits), + STRUCT("Murphy" as name, [23.9, 26.0, 27.0, 26.0] as splits), + STRUCT("Bosse" as name, [23.6, 26.2, 26.5, 27.1] as splits), + STRUCT("Rotich" as name, [24.7, 25.6, 26.9, 26.4] as splits), + STRUCT("Lewandowski" as name, [25.0, 25.7, 26.3, 27.2] as splits), + STRUCT("Kipketer" as name, [23.2, 26.1, 27.3, 29.4] as splits), + STRUCT("Berian" as name, [23.7, 26.1, 27.0, 29.3] as splits)] + AS participants) + SELECT + race, + participant + FROM races r + CROSS JOIN UNNEST(r.participants) as participant; + """ + query_job = client.query(sql) + arrow_table = query_job.to_arrow() + + print( + "Downloaded {} rows, {} columns.".format( + arrow_table.num_rows, arrow_table.num_columns + ) + ) + print("\nSchema:\n{}".format(repr(arrow_table.schema))) + # [END bigquery_query_to_arrow] + return arrow_table + + +if __name__ == "__main__": + from google.cloud import bigquery + + main(bigquery.Client()) diff --git a/bigquery/samples/tests/test_query_to_arrow.py b/bigquery/samples/tests/test_query_to_arrow.py new file mode 100644 index 000000000000..9e36bcee346f --- /dev/null +++ b/bigquery/samples/tests/test_query_to_arrow.py @@ -0,0 +1,29 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyarrow + +from .. import query_to_arrow + + +def test_main(capsys, client): + + arrow_table = query_to_arrow.main(client) + out, err = capsys.readouterr() + assert "Downloaded 8 rows, 2 columns." in out + + arrow_schema = arrow_table.schema + assert arrow_schema.names == ["race", "participant"] + assert pyarrow.types.is_string(arrow_schema.types[0]) + assert pyarrow.types.is_struct(arrow_schema.types[1]) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 72d6cf401c69..8585cd73ca69 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -2164,19 +2164,25 @@ def test_to_dataframe_w_bqstorage_fallback_to_tabledata_list(self): self.assertEqual(df.age.dtype.name, "int64") @unittest.skipIf(pandas is None, "Requires `pandas`") - @mock.patch( - "google.cloud.bigquery.table.RowIterator.pages", new_callable=mock.PropertyMock - ) - def test_to_dataframe_tabledata_list_w_multiple_pages_return_unique_index( - self, mock_pages - ): + def test_to_dataframe_tabledata_list_w_multiple_pages_return_unique_index(self): from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut iterator_schema = [schema.SchemaField("name", "STRING", mode="REQUIRED")] - pages = [[{"name": "Bengt"}], [{"name": "Sven"}]] - - mock_pages.return_value = pages - row_iterator = self._make_one(schema=iterator_schema) + path = "/foo" + api_request = mock.Mock( + side_effect=[ + {"rows": [{"f": [{"v": "Bengt"}]}], "pageToken": "NEXTPAGE"}, + {"rows": [{"f": [{"v": "Sven"}]}]}, + ] + ) + row_iterator = mut.RowIterator( + _mock_client(), + api_request, + path, + iterator_schema, + table=mut.Table("proj.dset.tbl"), + ) df = row_iterator.to_dataframe(bqstorage_client=None) From e04be8e6e19b09654f3c2157a7943456bec384ca Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 3 Jul 2019 16:43:17 -0700 Subject: [PATCH 2/9] _response is unused. --- bigquery/google/cloud/bigquery/table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index a1af28ec7a12..277c72c59f39 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1844,8 +1844,8 @@ def _rows_page_start(iterator, page, response): :type response: dict :param response: The JSON API response for a page of rows in a table. """ - # Save the raw response so that it can be processed separately. - page._response = response + # Make a (lazy) copy of the page in column-oriented format for use in data + # science packages. page._columns = _tabledata_list_page_columns(iterator._schema, response) total_rows = response.get("totalRows") From 3f542dbc33ff3042501967b0db46f0508bde7b5d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 8 Jul 2019 15:11:39 -0700 Subject: [PATCH 3/9] Add unit tests for to_arrow. --- bigquery/google/cloud/bigquery/_helpers.py | 2 +- .../google/cloud/bigquery/_pandas_helpers.py | 13 +- bigquery/google/cloud/bigquery/job.py | 4 +- bigquery/google/cloud/bigquery/table.py | 7 +- bigquery/setup.py | 2 +- bigquery/tests/unit/test__pandas_helpers.py | 66 ++++- bigquery/tests/unit/test_job.py | 101 +++++++- bigquery/tests/unit/test_table.py | 230 +++++++++++++++++- 8 files changed, 400 insertions(+), 25 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 5bd92aea951f..bb3998732a5a 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -198,7 +198,7 @@ def _field_to_index_mapping(schema): def _field_from_json(resource, field): - converter = _CELLDATA_FROM_JSON[field.field_type] + converter = _CELLDATA_FROM_JSON.get(field.field_type, lambda value, _: value) if field.mode == "REPEATED": return [converter(item["v"], field) for item in resource] else: diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 306e16a55ce9..4ba6257ec1d8 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -114,7 +114,7 @@ def bq_to_arrow_data_type(field): """ if field.mode is not None and field.mode.upper() == "REPEATED": inner_type = bq_to_arrow_data_type( - schema.SchemaField(field.name, field.field_type) + schema.SchemaField(field.name, field.field_type, fields=field.fields) ) if inner_type: return pyarrow.list_(inner_type) @@ -154,11 +154,8 @@ def bq_to_arrow_schema(bq_schema): for bq_field in bq_schema: arrow_field = bq_to_arrow_field(bq_field) if arrow_field is None: - raise ValueError( - "Could not dermine Arrow type for BigQuery field: {}.".format( - repr(bq_field) - ) - ) + # Auto-detect the schema if there is an unknown field type. + return None arrow_fields.append(arrow_field) return pyarrow.schema(arrow_fields) @@ -245,7 +242,7 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types): def download_arrow_tabledata_list(pages, schema): """Use tabledata.list to construct an iterable of RecordBatches.""" - column_names = [field.name for field in schema] + column_names = bq_to_arrow_schema(schema) or [field.name for field in schema] arrow_types = [bq_to_arrow_data_type(field) for field in schema] for page in pages: @@ -397,7 +394,7 @@ def download_dataframe_bqstorage( continue # Return any remaining values after the workers finished. - while not worker_queue.empty(): + while not worker_queue.empty(): # pragma: NO COVER try: # Include a timeout because even though the queue is # non-empty, it doesn't guarantee that a subsequent call to diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index ca6ccefe7167..442420a7191b 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2932,9 +2932,7 @@ def to_arrow(self, progress_bar_type=None): ..versionadded:: 1.17.0 """ - return self.result().to_arrow( - progress_bar_type=progress_bar_type, - ) + return self.result().to_arrow(progress_bar_type=progress_bar_type) def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Return a pandas DataFrame from a QueryJob diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 277c72c59f39..b126f9317dd5 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1466,11 +1466,8 @@ def to_arrow(self, progress_bar_type=None): # Indicate that the download has finished. progress_bar.close() - # Avoid concatting an empty list without a schema. - if not record_batches: - arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) - return pyarrow.Table.from_batches([], schema=arrow_schema) - return pyarrow.Table.from_batches(record_batches) + arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) + return pyarrow.Table.from_batches(record_batches, schema=arrow_schema) def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """Create an iterable of pandas DataFrames, to process the table as a stream. diff --git a/bigquery/setup.py b/bigquery/setup.py index 9bd4445637e8..bcc2113e5121 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -42,7 +42,7 @@ "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [ - "pyarrow>=0.4.1" + "pyarrow>=0.4.1, != 0.14.0" ], "tqdm": ["tqdm >= 4.0.0, <5.0.0dev"], "fastparquet": ["fastparquet", "python-snappy"], diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 10189a6d3f2f..62902cd7a71b 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -293,6 +293,51 @@ def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): assert actual.equals(expected) +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_data_type_w_array_struct(module_under_test, bq_type): + fields = ( + schema.SchemaField("field01", "STRING"), + schema.SchemaField("field02", "BYTES"), + schema.SchemaField("field03", "INTEGER"), + schema.SchemaField("field04", "INT64"), + schema.SchemaField("field05", "FLOAT"), + schema.SchemaField("field06", "FLOAT64"), + schema.SchemaField("field07", "NUMERIC"), + schema.SchemaField("field08", "BOOLEAN"), + schema.SchemaField("field09", "BOOL"), + schema.SchemaField("field10", "TIMESTAMP"), + schema.SchemaField("field11", "DATE"), + schema.SchemaField("field12", "TIME"), + schema.SchemaField("field13", "DATETIME"), + schema.SchemaField("field14", "GEOGRAPHY"), + ) + field = schema.SchemaField("ignored_name", bq_type, mode="REPEATED", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + expected_value_type = pyarrow.struct( + ( + pyarrow.field("field01", pyarrow.string()), + pyarrow.field("field02", pyarrow.binary()), + pyarrow.field("field03", pyarrow.int64()), + pyarrow.field("field04", pyarrow.int64()), + pyarrow.field("field05", pyarrow.float64()), + pyarrow.field("field06", pyarrow.float64()), + pyarrow.field("field07", module_under_test.pyarrow_numeric()), + pyarrow.field("field08", pyarrow.bool_()), + pyarrow.field("field09", pyarrow.bool_()), + pyarrow.field("field10", module_under_test.pyarrow_timestamp()), + pyarrow.field("field11", pyarrow.date32()), + pyarrow.field("field12", module_under_test.pyarrow_time()), + pyarrow.field("field13", module_under_test.pyarrow_datetime()), + pyarrow.field("field14", pyarrow.string()), + ) + ) + assert pyarrow.types.is_list(actual) + assert pyarrow.types.is_struct(actual.value_type) + assert actual.value_type.num_children == len(fields) + assert actual.value_type.equals(expected_value_type) + + @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test): fields = ( @@ -303,8 +348,14 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test): schema.SchemaField("field3", "UNKNOWN_TYPE"), ) field = schema.SchemaField("ignored_name", "RECORD", mode="NULLABLE", fields=fields) - actual = module_under_test.bq_to_arrow_data_type(field) + + with warnings.catch_warnings(record=True) as warned: + actual = module_under_test.bq_to_arrow_data_type(field) + assert actual is None + assert len(warned) == 1 + warning = warned[0] + assert "field3" in str(warning) @pytest.mark.parametrize( @@ -442,6 +493,19 @@ def test_bq_to_arrow_array_w_special_floats(module_under_test): assert roundtrip[3] is None +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_schema_w_unknown_type(module_under_test): + fields = ( + schema.SchemaField("field1", "STRING"), + schema.SchemaField("field2", "INTEGER"), + # Don't know what to convert UNKNOWN_TYPE to, let type inference work, + # instead. + schema.SchemaField("field3", "UNKNOWN_TYPE"), + ) + actual = module_under_test.bq_to_arrow_schema(fields) + assert actual is None + + @pytest.mark.skipIf(pandas is None, "Requires `pandas`") @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") def test_dataframe_to_arrow_w_required_fields(module_under_test): diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 3561fb857647..22809c245d4b 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -24,6 +24,11 @@ import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None + +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None try: from google.cloud import bigquery_storage_v1beta1 except (ImportError, AttributeError): # pragma: NO COVER @@ -4708,6 +4713,96 @@ def test_reload_w_alternate_client(self): ) self._verifyResourceProperties(job, RESOURCE) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow(self): + begun_resource = self._make_resource() + query_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "totalRows": "4", + "schema": { + "fields": [ + { + "name": "spouse_1", + "type": "RECORD", + "fields": [ + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, + ], + }, + { + "name": "spouse_2", + "type": "RECORD", + "fields": [ + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, + ], + }, + ] + }, + } + tabledata_resource = { + "rows": [ + { + "f": [ + {"v": {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}}, + {"v": {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}}, + ] + }, + { + "f": [ + {"v": {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}}, + {"v": {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}}, + ] + }, + ] + } + done_resource = copy.deepcopy(begun_resource) + done_resource["status"] = {"state": "DONE"} + connection = _make_connection( + begun_resource, query_resource, done_resource, tabledata_resource + ) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + tbl = job.to_arrow() + + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 2) + + # Check the schema. + self.assertEqual(tbl.schema[0].name, "spouse_1") + self.assertEqual(tbl.schema[0].type[0].name, "name") + self.assertEqual(tbl.schema[0].type[1].name, "age") + self.assertTrue(pyarrow.types.is_struct(tbl.schema[0].type)) + self.assertTrue(pyarrow.types.is_string(tbl.schema[0].type[0].type)) + self.assertTrue(pyarrow.types.is_int64(tbl.schema[0].type[1].type)) + self.assertEqual(tbl.schema[1].name, "spouse_2") + self.assertEqual(tbl.schema[1].type[0].name, "name") + self.assertEqual(tbl.schema[1].type[1].name, "age") + self.assertTrue(pyarrow.types.is_struct(tbl.schema[1].type)) + self.assertTrue(pyarrow.types.is_string(tbl.schema[1].type[0].type)) + self.assertTrue(pyarrow.types.is_int64(tbl.schema[1].type[1].type)) + + # Check the data. + tbl_data = tbl.to_pydict() + spouse_1 = tbl_data["spouse_1"] + self.assertEqual( + spouse_1, + [ + {"name": "Phred Phlyntstone", "age": 32}, + {"name": "Bhettye Rhubble", "age": 27}, + ], + ) + spouse_2 = tbl_data["spouse_2"] + self.assertEqual( + spouse_2, + [ + {"name": "Wylma Phlyntstone", "age": 29}, + {"name": "Bharney Rhubble", "age": 33}, + ], + ) + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe(self): begun_resource = self._make_resource() @@ -4721,17 +4816,19 @@ def test_to_dataframe(self): {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, ] }, + } + tabledata_resource = { "rows": [ {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ], + ] } done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} connection = _make_connection( - begun_resource, query_resource, done_resource, query_resource + begun_resource, query_resource, done_resource, tabledata_resource ) client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 8585cd73ca69..95a6f8b62fa5 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -34,6 +34,12 @@ except (ImportError, AttributeError): # pragma: NO COVER pandas = None +try: + import pyarrow + import pyarrow.types +except ImportError: # pragma: NO COVER + pyarrow = None + try: from tqdm import tqdm except (ImportError, AttributeError): # pragma: NO COVER @@ -1379,11 +1385,14 @@ def test_to_dataframe(self): class TestRowIterator(unittest.TestCase): + def _class_under_test(self): + from google.cloud.bigquery.table import RowIterator + + return RowIterator + def _make_one( self, client=None, api_request=None, path=None, schema=None, **kwargs ): - from google.cloud.bigquery.table import RowIterator - if client is None: client = _mock_client() @@ -1396,7 +1405,7 @@ def _make_one( if schema is None: schema = [] - return RowIterator(client, api_request, path, schema, **kwargs) + return self._class_under_test()(client, api_request, path, schema, **kwargs) def test_constructor(self): from google.cloud.bigquery.table import _item_to_row @@ -1489,6 +1498,212 @@ def test_page_size(self): query_params={"maxResults": row_iterator._page_size}, ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow(self): + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField( + "child", + "RECORD", + mode="REPEATED", + fields=[ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ], + ), + ] + rows = [ + { + "f": [ + {"v": "Bharney Rhubble"}, + {"v": "33"}, + { + "v": [ + {"v": {"f": [{"v": "Whamm-Whamm Rhubble"}, {"v": "3"}]}}, + {"v": {"f": [{"v": "Hoppy"}, {"v": "1"}]}}, + ] + }, + ] + }, + { + "f": [ + {"v": "Wylma Phlyntstone"}, + {"v": "29"}, + { + "v": [ + {"v": {"f": [{"v": "Bepples Phlyntstone"}, {"v": "0"}]}}, + {"v": {"f": [{"v": "Dino"}, {"v": "4"}]}}, + ] + }, + ] + }, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + tbl = row_iterator.to_arrow() + + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 2) + + # Check the schema. + self.assertEqual(tbl.schema[0].name, "name") + self.assertTrue(pyarrow.types.is_string(tbl.schema[0].type)) + self.assertEqual(tbl.schema[1].name, "age") + self.assertTrue(pyarrow.types.is_int64(tbl.schema[1].type)) + child_field = tbl.schema[2] + self.assertEqual(child_field.name, "child") + self.assertTrue(pyarrow.types.is_list(child_field.type)) + self.assertTrue(pyarrow.types.is_struct(child_field.type.value_type)) + self.assertEqual(child_field.type.value_type[0].name, "name") + self.assertEqual(child_field.type.value_type[1].name, "age") + + # Check the data. + tbl_data = tbl.to_pydict() + names = tbl_data["name"] + ages = tbl_data["age"] + children = tbl_data["child"] + self.assertEqual(names, ["Bharney Rhubble", "Wylma Phlyntstone"]) + self.assertEqual(ages, [33, 29]) + self.assertEqual( + children, + [ + [ + {"name": "Whamm-Whamm Rhubble", "age": 3}, + {"name": "Hoppy", "age": 1}, + ], + [{"name": "Bepples Phlyntstone", "age": 0}, {"name": "Dino", "age": 4}], + ], + ) + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_w_unknown_type(self): + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField("sport", "UNKNOWN_TYPE", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}, {"v": "volleyball"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}, {"v": "basketball"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + tbl = row_iterator.to_arrow() + + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 2) + + # Check the schema. + self.assertEqual(tbl.schema[0].name, "name") + self.assertTrue(pyarrow.types.is_string(tbl.schema[0].type)) + self.assertEqual(tbl.schema[1].name, "age") + self.assertTrue(pyarrow.types.is_int64(tbl.schema[1].type)) + self.assertEqual(tbl.schema[2].name, "sport") + + # Check the data. + tbl_data = tbl.to_pydict() + names = tbl_data["name"] + ages = tbl_data["age"] + sports = tbl_data["sport"] + self.assertEqual(names, ["Bharney Rhubble", "Wylma Phlyntstone"]) + self.assertEqual(ages, [33, 29]) + self.assertEqual(sports, ["volleyball", "basketball"]) + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_w_empty_table(self): + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField( + "child", + "RECORD", + mode="REPEATED", + fields=[ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ], + ), + ] + rows = [] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + tbl = row_iterator.to_arrow() + + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 0) + + # Check the schema. + self.assertEqual(tbl.schema[0].name, "name") + self.assertTrue(pyarrow.types.is_string(tbl.schema[0].type)) + self.assertEqual(tbl.schema[1].name, "age") + self.assertTrue(pyarrow.types.is_int64(tbl.schema[1].type)) + child_field = tbl.schema[2] + self.assertEqual(child_field.name, "child") + self.assertTrue(pyarrow.types.is_list(child_field.type)) + self.assertTrue(pyarrow.types.is_struct(child_field.type.value_type)) + self.assertEqual(child_field.type.value_type[0].name, "name") + self.assertEqual(child_field.type.value_type[1].name, "age") + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @mock.patch("tqdm.tqdm_gui") + @mock.patch("tqdm.tqdm_notebook") + @mock.patch("tqdm.tqdm") + def test_to_arrow_progress_bar(self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock): + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + + progress_bars = ( + ("tqdm", tqdm_mock), + ("tqdm_notebook", tqdm_notebook_mock), + ("tqdm_gui", tqdm_gui_mock), + ) + + for progress_bar_type, progress_bar_mock in progress_bars: + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + tbl = row_iterator.to_arrow(progress_bar_type=progress_bar_type) + + progress_bar_mock.assert_called() + progress_bar_mock().update.assert_called() + progress_bar_mock().close.assert_called_once() + self.assertEqual(tbl.num_rows, 4) + + @mock.patch("google.cloud.bigquery.table.pyarrow", new=None) + def test_to_arrow_w_pyarrow_none(self): + schema = [] + rows = [] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + with self.assertRaises(ValueError): + row_iterator.to_arrow() + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe(self): from google.cloud.bigquery.table import SchemaField @@ -1631,10 +1846,17 @@ def test_to_dataframe_tqdm_error(self): for progress_bar_type in ("tqdm", "tqdm_notebook", "tqdm_gui"): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) + + with warnings.catch_warnings(record=True) as warned: + df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) self.assertEqual(len(df), 4) # all should be well + # Warn that a progress bar was requested, but creating the tqdm + # progress bar failed. + for warning in warned: + self.assertIs(warning.category, UserWarning) + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_empty_results(self): from google.cloud.bigquery.table import SchemaField From 3a5fdda642bf6eaa1e694496432d155c423b714f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 8 Jul 2019 15:21:50 -0700 Subject: [PATCH 4/9] Add comment for excluding pyarrow 0.14.0 --- bigquery/setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigquery/setup.py b/bigquery/setup.py index bcc2113e5121..5637c0f4fd53 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -42,6 +42,8 @@ "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [ + # Bad Linux release for 0.14.0. + # https://issues.apache.org/jira/browse/ARROW-5868 "pyarrow>=0.4.1, != 0.14.0" ], "tqdm": ["tqdm >= 4.0.0, <5.0.0dev"], From b9cb3ed857d1a4e97dfd2cba0fa6a95c7169eb77 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 9 Jul 2019 09:40:54 -0700 Subject: [PATCH 5/9] Test for nullable data in to_arrow. --- bigquery/tests/unit/test_table.py | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 95a6f8b62fa5..ffacf77b5278 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1580,6 +1580,39 @@ def test_to_arrow(self): ], ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow_w_nulls(self): + from google.cloud.bigquery.table import SchemaField + + schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] + rows = [ + {"f": [{"v": "Donkey"}, {"v": 32}]}, + {"f": [{"v": "Diddy"}, {"v": 29}]}, + {"f": [{"v": "Dixie"}, {"v": None}]}, + {"f": [{"v": None}, {"v": 111}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + tbl = row_iterator.to_arrow() + + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 4) + + # Check the schema. + self.assertEqual(tbl.schema[0].name, "name") + self.assertTrue(pyarrow.types.is_string(tbl.schema[0].type)) + self.assertEqual(tbl.schema[1].name, "age") + self.assertTrue(pyarrow.types.is_int64(tbl.schema[1].type)) + + # Check the data. + tbl_data = tbl.to_pydict() + names = tbl_data["name"] + ages = tbl_data["age"] + self.assertEqual(names, ["Donkey", "Diddy", "Dixie", None]) + self.assertEqual(ages, [32, 29, None, 111]) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_unknown_type(self): from google.cloud.bigquery.table import SchemaField From 3a597938fcba936c2e90597cc0843c1e3aa03c89 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 9 Jul 2019 09:43:32 -0700 Subject: [PATCH 6/9] Correct docstring for bq_to_arrow_schema. --- bigquery/google/cloud/bigquery/_pandas_helpers.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 4ba6257ec1d8..5a3a9833b572 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -146,9 +146,7 @@ def bq_to_arrow_field(bq_field): def bq_to_arrow_schema(bq_schema): """Return the Arrow schema, corresponding to a given BigQuery schema. - Raises: - ValueError: - If the Arrow type of any column cannot be determined. + Returns None if any Arrow type cannot be determined. """ arrow_fields = [] for bq_field in bq_schema: From f8b96d5772fde8e62a88c2ca8776e7ecd5070660 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 9 Jul 2019 09:58:04 -0700 Subject: [PATCH 7/9] Bad wheels have been removed from PyPI. --- bigquery/setup.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bigquery/setup.py b/bigquery/setup.py index 5637c0f4fd53..9bd4445637e8 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -42,9 +42,7 @@ "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [ - # Bad Linux release for 0.14.0. - # https://issues.apache.org/jira/browse/ARROW-5868 - "pyarrow>=0.4.1, != 0.14.0" + "pyarrow>=0.4.1" ], "tqdm": ["tqdm >= 4.0.0, <5.0.0dev"], "fastparquet": ["fastparquet", "python-snappy"], From ec2afe28ff1c1bda71c5ebc9b98695bd9f93d7cb Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 9 Jul 2019 10:16:30 -0700 Subject: [PATCH 8/9] Add to_arrow to EmptyRowIterator. --- bigquery/google/cloud/bigquery/table.py | 15 +++++++++++++++ bigquery/tests/unit/test_table.py | 13 +++++++++++++ 2 files changed, 28 insertions(+) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index b126f9317dd5..8aa7788acdfa 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1613,6 +1613,21 @@ class _EmptyRowIterator(object): pages = () total_rows = 0 + def to_arrow(self, progress_bar_type=None): + """[Beta] Create an empty class:`pyarrow.Table`. + + Args: + progress_bar_type (Optional[str]): + Ignored. Added for compatibility with RowIterator. + + Returns: + pyarrow.Table: + An empty :class:`pyarrow.Table`. + """ + if pyarrow is None: + raise ValueError(_NO_PYARROW_ERROR) + return pyarrow.Table.from_arrays(()) + def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Create an empty dataframe. diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index ffacf77b5278..a892dccf9f28 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1370,6 +1370,19 @@ def test_total_rows_eq_zero(self): row_iterator = self._make_one() self.assertEqual(row_iterator.total_rows, 0) + @mock.patch("google.cloud.bigquery.table.pyarrow", new=None) + def test_to_arrow_error_if_pyarrow_is_none(self): + row_iterator = self._make_one() + with self.assertRaises(ValueError): + row_iterator.to_arrow() + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_arrow(self): + row_iterator = self._make_one() + tbl = row_iterator.to_arrow() + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 0) + @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_error_if_pandas_is_none(self): row_iterator = self._make_one() From 9ae3572c14bb0fd80a0fb9bc65dc9410f91f029b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 9 Jul 2019 14:06:45 -0700 Subject: [PATCH 9/9] Revert "Bad wheels have been removed from PyPI." This reverts commit f8b96d5772fde8e62a88c2ca8776e7ecd5070660. --- bigquery/setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigquery/setup.py b/bigquery/setup.py index 9bd4445637e8..5637c0f4fd53 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -42,7 +42,9 @@ "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [ - "pyarrow>=0.4.1" + # Bad Linux release for 0.14.0. + # https://issues.apache.org/jira/browse/ARROW-5868 + "pyarrow>=0.4.1, != 0.14.0" ], "tqdm": ["tqdm >= 4.0.0, <5.0.0dev"], "fastparquet": ["fastparquet", "python-snappy"],