diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index cc53ffa22985..a3c4189691ed 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -15,6 +15,7 @@ """Client for interacting with the Google BigQuery API.""" from __future__ import absolute_import +from __future__ import division try: from collections import abc as collections_abc @@ -25,7 +26,9 @@ import functools import gzip import io +import itertools import json +import math import os import tempfile import uuid @@ -2080,6 +2083,57 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs): return self.insert_rows_json(table, json_rows, **kwargs) + def insert_rows_from_dataframe( + self, table, dataframe, selected_fields=None, chunk_size=500, **kwargs + ): + """Insert rows into a table from a dataframe via the streaming API. + + Args: + table (Union[ \ + :class:`~google.cloud.bigquery.table.Table`, \ + :class:`~google.cloud.bigquery.table.TableReference`, \ + str, \ + ]): + The destination table for the row data, or a reference to it. + dataframe (pandas.DataFrame): + A :class:`~pandas.DataFrame` containing the data to load. + selected_fields (Sequence[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ]): + The fields to return. Required if ``table`` is a + :class:`~google.cloud.bigquery.table.TableReference`. + chunk_size (int): + The number of rows to stream in a single chunk. Must be positive. + kwargs (dict): + Keyword arguments to + :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`. + + Returns: + Sequence[Sequence[Mappings]]: + A list with insert errors for each insert chunk. Each element + is a list containing one mapping per row with insert errors: + the "index" key identifies the row, and the "errors" key + contains a list of the mappings describing one or more problems + with the row. + + Raises: + ValueError: if table's schema is not set + """ + insert_results = [] + + chunk_count = int(math.ceil(len(dataframe) / chunk_size)) + rows_iter = ( + dict(six.moves.zip(dataframe.columns, row)) + for row in dataframe.itertuples(index=False, name=None) + ) + + for _ in range(chunk_count): + rows_chunk = itertools.islice(rows_iter, chunk_size) + result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs) + insert_results.append(result) + + return insert_results + def insert_rows_json( self, table, diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 3593e1ecb609..813aa1d9442c 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1951,6 +1951,73 @@ def test_query_results_to_dataframe_w_bqstorage(self): if not row[col] is None: self.assertIsInstance(row[col], exp_datatypes[col]) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_insert_rows_from_dataframe(self): + SF = bigquery.SchemaField + schema = [ + SF("float_col", "FLOAT", mode="REQUIRED"), + SF("int_col", "INTEGER", mode="REQUIRED"), + SF("bool_col", "BOOLEAN", mode="REQUIRED"), + SF("string_col", "STRING", mode="NULLABLE"), + ] + + dataframe = pandas.DataFrame( + [ + { + "float_col": 1.11, + "bool_col": True, + "string_col": "my string", + "int_col": 10, + }, + { + "float_col": 2.22, + "bool_col": False, + "string_col": "another string", + "int_col": 20, + }, + { + "float_col": 3.33, + "bool_col": False, + "string_col": "another string", + "int_col": 30, + }, + { + "float_col": 4.44, + "bool_col": True, + "string_col": "another string", + "int_col": 40, + }, + { + "float_col": 5.55, + "bool_col": False, + "string_col": "another string", + "int_col": 50, + }, + ] + ) + + table_id = "test_table" + dataset = self.temp_dataset(_make_dataset_id("issue_7553")) + table_arg = Table(dataset.table(table_id), schema=schema) + table = retry_403(Config.CLIENT.create_table)(table_arg) + self.to_delete.insert(0, table) + + Config.CLIENT.insert_rows_from_dataframe(table, dataframe, chunk_size=3) + + retry = RetryResult(_has_rows, max_tries=8) + rows = retry(self._fetch_single_page)(table) + + sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) + row_tuples = [r.values() for r in sorted_rows] + expected = [tuple(data_row) for data_row in dataframe.itertuples(index=False)] + + assert len(row_tuples) == len(expected) + + for row, expected_row in zip(row_tuples, expected): + six.assertCountEqual( + self, row, expected_row + ) # column order does not matter + def test_insert_rows_nested_nested(self): # See #2951 SF = bigquery.SchemaField diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index f31d8587322b..4cdc413a3263 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -4472,6 +4472,130 @@ def test_insert_rows_w_numeric(self): data=sent, ) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_insert_rows_from_dataframe(self): + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.table import Table + + API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( + self.PROJECT, self.DS_ID, self.TABLE_REF.table_id + ) + + dataframe = pandas.DataFrame( + [ + {"name": u"Little One", "age": 10, "adult": False}, + {"name": u"Young Gun", "age": 20, "adult": True}, + {"name": u"Dad", "age": 30, "adult": True}, + {"name": u"Stranger", "age": 40, "adult": True}, + ] + ) + + # create client + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection({}, {}) + + # create table + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField("adult", "BOOLEAN", mode="REQUIRED"), + ] + table = Table(self.TABLE_REF, schema=schema) + + with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))): + error_info = client.insert_rows_from_dataframe( + table, dataframe, chunk_size=3 + ) + + self.assertEqual(len(error_info), 2) + for chunk_errors in error_info: + assert chunk_errors == [] + + EXPECTED_SENT_DATA = [ + { + "rows": [ + { + "insertId": "0", + "json": {"name": "Little One", "age": "10", "adult": "false"}, + }, + { + "insertId": "1", + "json": {"name": "Young Gun", "age": "20", "adult": "true"}, + }, + { + "insertId": "2", + "json": {"name": "Dad", "age": "30", "adult": "true"}, + }, + ] + }, + { + "rows": [ + { + "insertId": "3", + "json": {"name": "Stranger", "age": "40", "adult": "true"}, + } + ] + }, + ] + + actual_calls = conn.api_request.call_args_list + + for call, expected_data in six.moves.zip_longest( + actual_calls, EXPECTED_SENT_DATA + ): + expected_call = mock.call(method="POST", path=API_PATH, data=expected_data) + assert call == expected_call + + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_insert_rows_from_dataframe_many_columns(self): + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.table import Table + + API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( + self.PROJECT, self.DS_ID, self.TABLE_REF.table_id + ) + N_COLUMNS = 256 # should be >= 256 + + dataframe = pandas.DataFrame( + [{"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)}] + ) + + # create client + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection({}, {}) + + # create table + schema = [SchemaField("foo_{}".format(i), "STRING") for i in range(N_COLUMNS)] + table = Table(self.TABLE_REF, schema=schema) + + with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))): + error_info = client.insert_rows_from_dataframe( + table, dataframe, chunk_size=3 + ) + + assert len(error_info) == 1 + assert error_info[0] == [] + + EXPECTED_SENT_DATA = { + "rows": [ + { + "insertId": "0", + "json": { + "foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS) + }, + } + ] + } + expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA) + + actual_calls = conn.api_request.call_args_list + assert len(actual_calls) == 1 + assert actual_calls[0] == expected_call + def test_insert_rows_json(self): from google.cloud.bigquery.table import Table, SchemaField from google.cloud.bigquery.dataset import DatasetReference