-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add
BulkResponse
wrapper for improved decoding of HTTP bulk responses
CrateDB HTTP bulk responses include `rowcount=` items, either signalling if a bulk operation succeeded or failed. - success means `rowcount=1` - failure means `rowcount=-2` https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
- Loading branch information
Showing
5 changed files
with
427 additions
and
284 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import typing as t | ||
from functools import cached_property | ||
|
||
|
||
class BulkResultItem(t.TypedDict): | ||
""" | ||
Define the shape of a CrateDB bulk request response item. | ||
""" | ||
|
||
rowcount: int | ||
|
||
|
||
class BulkResponse: | ||
""" | ||
Manage CrateDB bulk request responses. | ||
Accepts a list of bulk arguments (parameter list) and a list of bulk response items. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations | ||
""" | ||
|
||
def __init__( | ||
self, | ||
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None], | ||
results: t.Union[t.Iterable[BulkResultItem], None]): | ||
self.records = records | ||
self.results = results | ||
|
||
@cached_property | ||
def failed_records(self) -> t.List[t.Dict[str, t.Any]]: | ||
""" | ||
Compute list of failed records. | ||
CrateDB signals failed inserts using `rowcount=-2`. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling | ||
""" | ||
if self.records is None or self.results is None: | ||
return [] | ||
errors: t.List[t.Dict[str, t.Any]] = [] | ||
for record, status in zip(self.records, self.results): | ||
if status["rowcount"] == -2: | ||
errors.append(record) | ||
return errors | ||
|
||
@cached_property | ||
def record_count(self) -> int: | ||
""" | ||
Compute bulk size / length of parameter list. | ||
""" | ||
if not self.records: | ||
return 0 | ||
return len(self.records) | ||
|
||
@cached_property | ||
def success_count(self) -> int: | ||
""" | ||
Compute number of succeeding records within a batch. | ||
""" | ||
return self.record_count - self.failed_count | ||
|
||
@cached_property | ||
def failed_count(self) -> int: | ||
""" | ||
Compute number of failed records within a batch. | ||
""" | ||
return len(self.failed_records) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import sys | ||
import unittest | ||
|
||
from crate import client | ||
from crate.client.exceptions import ProgrammingError | ||
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline | ||
from crate.testing.settings import crate_host | ||
|
||
|
||
class BulkOperationTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
setUpCrateLayerBaseline(self) | ||
|
||
def tearDown(self): | ||
tearDownDropEntitiesBaseline(self) | ||
|
||
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") | ||
def test_executemany_with_bulk_response_partial(self): | ||
|
||
# Import at runtime is on purpose, to permit skipping the test case. | ||
from crate.client.result import BulkResponse | ||
|
||
connection = client.connect(crate_host) | ||
cursor = connection.cursor() | ||
|
||
# Run SQL DDL. | ||
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);") | ||
|
||
# Run a batch insert that only partially succeeds. | ||
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")] | ||
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records) | ||
|
||
# Verify CrateDB response. | ||
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}]) | ||
|
||
# Verify decoded response. | ||
bulk_response = BulkResponse(invalid_records, result) | ||
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")]) | ||
self.assertEqual(bulk_response.record_count, 2) | ||
self.assertEqual(bulk_response.success_count, 1) | ||
self.assertEqual(bulk_response.failed_count, 1) | ||
|
||
cursor.execute("REFRESH TABLE foobar;") | ||
cursor.execute("SELECT * FROM foobar;") | ||
result = cursor.fetchall() | ||
self.assertEqual(result, [[1, "Hotzenplotz 1"]]) | ||
|
||
cursor.close() | ||
connection.close() | ||
|
||
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") | ||
def test_executemany_empty(self): | ||
|
||
connection = client.connect(crate_host) | ||
cursor = connection.cursor() | ||
|
||
# Run SQL DDL. | ||
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);") | ||
|
||
# Run a batch insert that is empty. | ||
with self.assertRaises(ProgrammingError) as cm: | ||
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", []) | ||
self.assertEqual( | ||
cm.exception.message, | ||
"SQLParseException[The query contains a parameter placeholder $1, " | ||
"but there are only 0 parameter values]") | ||
|
||
cursor.close() | ||
connection.close() |
Oops, something went wrong.