Skip to content

Commit

Permalink
BigQuery: Add client.insert_rows_from_dataframe() method (googleapis#…
Browse files Browse the repository at this point in the history
…9162)

* Add client.insert_rows_from_dataframe() method

* Avoid using nametuples for dataframe row iteration

dataframe.itertuples() returns plain tuples under certain conditions,
thus this commit enforces always returning plain tuples, and
constructs the row dictionary manually from each tuple.

* Skip insert_rows_from_dataframe tests if no Pandas
  • Loading branch information
plamut authored and emar-kar committed Sep 18, 2019
1 parent 76e1d4a commit a671d36
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
54 changes: 54 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Client for interacting with the Google BigQuery API."""

from __future__ import absolute_import
from __future__ import division

try:
from collections import abc as collections_abc
Expand All @@ -25,7 +26,9 @@
import functools
import gzip
import io
import itertools
import json
import math
import os
import tempfile
import uuid
Expand Down Expand Up @@ -2111,6 +2114,57 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs):

return self.insert_rows_json(table, json_rows, **kwargs)

def insert_rows_from_dataframe(
self, table, dataframe, selected_fields=None, chunk_size=500, **kwargs
):
"""Insert rows into a table from a dataframe via the streaming API.
Args:
table (Union[ \
:class:`~google.cloud.bigquery.table.Table`, \
:class:`~google.cloud.bigquery.table.TableReference`, \
str, \
]):
The destination table for the row data, or a reference to it.
dataframe (pandas.DataFrame):
A :class:`~pandas.DataFrame` containing the data to load.
selected_fields (Sequence[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
]):
The fields to return. Required if ``table`` is a
:class:`~google.cloud.bigquery.table.TableReference`.
chunk_size (int):
The number of rows to stream in a single chunk. Must be positive.
kwargs (dict):
Keyword arguments to
:meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
Returns:
Sequence[Sequence[Mappings]]:
A list with insert errors for each insert chunk. Each element
is a list containing one mapping per row with insert errors:
the "index" key identifies the row, and the "errors" key
contains a list of the mappings describing one or more problems
with the row.
Raises:
ValueError: if table's schema is not set
"""
insert_results = []

chunk_count = int(math.ceil(len(dataframe) / chunk_size))
rows_iter = (
dict(six.moves.zip(dataframe.columns, row))
for row in dataframe.itertuples(index=False, name=None)
)

for _ in range(chunk_count):
rows_chunk = itertools.islice(rows_iter, chunk_size)
result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
insert_results.append(result)

return insert_results

def insert_rows_json(
self,
table,
Expand Down
67 changes: 67 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,73 @@ def test_query_results_to_dataframe_w_bqstorage(self):
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe(self):
SF = bigquery.SchemaField
schema = [
SF("float_col", "FLOAT", mode="REQUIRED"),
SF("int_col", "INTEGER", mode="REQUIRED"),
SF("bool_col", "BOOLEAN", mode="REQUIRED"),
SF("string_col", "STRING", mode="NULLABLE"),
]

dataframe = pandas.DataFrame(
[
{
"float_col": 1.11,
"bool_col": True,
"string_col": "my string",
"int_col": 10,
},
{
"float_col": 2.22,
"bool_col": False,
"string_col": "another string",
"int_col": 20,
},
{
"float_col": 3.33,
"bool_col": False,
"string_col": "another string",
"int_col": 30,
},
{
"float_col": 4.44,
"bool_col": True,
"string_col": "another string",
"int_col": 40,
},
{
"float_col": 5.55,
"bool_col": False,
"string_col": "another string",
"int_col": 50,
},
]
)

table_id = "test_table"
dataset = self.temp_dataset(_make_dataset_id("issue_7553"))
table_arg = Table(dataset.table(table_id), schema=schema)
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)

Config.CLIENT.insert_rows_from_dataframe(table, dataframe, chunk_size=3)

retry = RetryResult(_has_rows, max_tries=8)
rows = retry(self._fetch_single_page)(table)

sorted_rows = sorted(rows, key=operator.attrgetter("int_col"))
row_tuples = [r.values() for r in sorted_rows]
expected = [tuple(data_row) for data_row in dataframe.itertuples(index=False)]

assert len(row_tuples) == len(expected)

for row, expected_row in zip(row_tuples, expected):
six.assertCountEqual(
self, row, expected_row
) # column order does not matter

def test_insert_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
124 changes: 124 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4473,6 +4473,130 @@ def test_insert_rows_w_numeric(self):
data=sent,
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe(self):
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery.table import Table

API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format(
self.PROJECT, self.DS_ID, self.TABLE_REF.table_id
)

dataframe = pandas.DataFrame(
[
{"name": u"Little One", "age": 10, "adult": False},
{"name": u"Young Gun", "age": 20, "adult": True},
{"name": u"Dad", "age": 30, "adult": True},
{"name": u"Stranger", "age": 40, "adult": True},
]
)

# create client
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection({}, {})

# create table
schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
SchemaField("adult", "BOOLEAN", mode="REQUIRED"),
]
table = Table(self.TABLE_REF, schema=schema)

with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))):
error_info = client.insert_rows_from_dataframe(
table, dataframe, chunk_size=3
)

self.assertEqual(len(error_info), 2)
for chunk_errors in error_info:
assert chunk_errors == []

EXPECTED_SENT_DATA = [
{
"rows": [
{
"insertId": "0",
"json": {"name": "Little One", "age": "10", "adult": "false"},
},
{
"insertId": "1",
"json": {"name": "Young Gun", "age": "20", "adult": "true"},
},
{
"insertId": "2",
"json": {"name": "Dad", "age": "30", "adult": "true"},
},
]
},
{
"rows": [
{
"insertId": "3",
"json": {"name": "Stranger", "age": "40", "adult": "true"},
}
]
},
]

actual_calls = conn.api_request.call_args_list

for call, expected_data in six.moves.zip_longest(
actual_calls, EXPECTED_SENT_DATA
):
expected_call = mock.call(method="POST", path=API_PATH, data=expected_data)
assert call == expected_call

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe_many_columns(self):
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery.table import Table

API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format(
self.PROJECT, self.DS_ID, self.TABLE_REF.table_id
)
N_COLUMNS = 256 # should be >= 256

dataframe = pandas.DataFrame(
[{"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)}]
)

# create client
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection({}, {})

# create table
schema = [SchemaField("foo_{}".format(i), "STRING") for i in range(N_COLUMNS)]
table = Table(self.TABLE_REF, schema=schema)

with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))):
error_info = client.insert_rows_from_dataframe(
table, dataframe, chunk_size=3
)

assert len(error_info) == 1
assert error_info[0] == []

EXPECTED_SENT_DATA = {
"rows": [
{
"insertId": "0",
"json": {
"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)
},
}
]
}
expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA)

actual_calls = conn.api_request.call_args_list
assert len(actual_calls) == 1
assert actual_calls[0] == expected_call

def test_insert_rows_json(self):
from google.cloud.bigquery.table import Table, SchemaField
from google.cloud.bigquery.dataset import DatasetReference
Expand Down

0 comments on commit a671d36

Please sign in to comment.