-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add
BulkResponse
wrapper for improved decoding of HTTP bulk responses
CrateDB HTTP bulk responses include `rowcount=` items, either signalling if a bulk operation succeeded or failed. - success means `rowcount=1` - failure means `rowcount=-2` https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
- Loading branch information
Showing
5 changed files
with
405 additions
and
284 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import typing as t | ||
from functools import cached_property | ||
|
||
|
||
class BulkResultItem(t.TypedDict): | ||
""" | ||
Define the shape of a CrateDB bulk request response item. | ||
""" | ||
|
||
rowcount: int | ||
|
||
|
||
class BulkResponse: | ||
""" | ||
Manage CrateDB bulk request responses. | ||
Accepts a list of bulk arguments (parameter list) and a list of bulk response items. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations | ||
""" | ||
|
||
def __init__( | ||
self, | ||
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None], | ||
results: t.Union[t.Iterable[BulkResultItem], None]): | ||
self.records = records | ||
self.results = results | ||
|
||
@cached_property | ||
def failed_records(self) -> t.List[t.Dict[str, t.Any]]: | ||
""" | ||
Compute list of failed records. | ||
CrateDB signals failed inserts using `rowcount=-2`. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling | ||
""" | ||
if self.records is None or self.results is None: | ||
return [] | ||
errors: t.List[t.Dict[str, t.Any]] = [] | ||
for record, status in zip(self.records, self.results): | ||
if status["rowcount"] == -2: | ||
errors.append(record) | ||
return errors | ||
|
||
@cached_property | ||
def record_count(self) -> int: | ||
""" | ||
Compute bulk size / length of parameter list. | ||
""" | ||
if not self.records: | ||
return 0 | ||
return len(self.records) | ||
|
||
@cached_property | ||
def success_count(self) -> int: | ||
""" | ||
Compute number of succeeding records within a batch. | ||
""" | ||
return self.record_count - self.failed_count | ||
|
||
@cached_property | ||
def failed_count(self) -> int: | ||
""" | ||
Compute number of failed records within a batch. | ||
""" | ||
return len(self.failed_records) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import sys | ||
import unittest | ||
|
||
from crate import client | ||
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline | ||
from crate.testing.settings import crate_host | ||
|
||
|
||
class BulkOperationTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
setUpCrateLayerBaseline(self) | ||
|
||
def tearDown(self): | ||
tearDownDropEntitiesBaseline(self) | ||
|
||
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") | ||
def test_executemany_with_bulk_response(self): | ||
|
||
# Import at runtime is on purpose, to permit skipping. | ||
from crate.client.result import BulkResponse | ||
|
||
connection = client.connect(crate_host) | ||
cursor = connection.cursor() | ||
|
||
# Run SQL DDL and a first valid INSERT operation. | ||
cursor.execute("CREATE TABLE foobar (id INTEGER);") | ||
_ = cursor.executemany("INSERT INTO foobar (id) VALUES (?)", [(1, ), (2, ), (3, )]) | ||
cursor.execute("REFRESH TABLE foobar;") | ||
|
||
# Run an invalid INSERT operation that will fail. | ||
invalid_records = [(4, ), ("Hotzenplotz", )] | ||
result = cursor.executemany("INSERT INTO foobar (id) VALUES (?)", invalid_records) | ||
|
||
# Verify CrateDB response. | ||
self.assertEqual(result, [{"rowcount": -2}, {"rowcount": -2}]) | ||
|
||
# Verify decoded response. | ||
bulk_response = BulkResponse(invalid_records, result) | ||
self.assertEqual(bulk_response.failed_records, invalid_records) | ||
|
||
cursor.execute("REFRESH TABLE foobar;") | ||
cursor.execute("SELECT * FROM foobar;") | ||
result = cursor.fetchall() | ||
self.assertEqual(sorted(result), [[1], [2], [3]]) | ||
|
||
cursor.close() | ||
connection.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,273 @@ | ||
# -*- coding: utf-8; -*- | ||
# | ||
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor | ||
# license agreements. See the NOTICE file distributed with this work for | ||
# additional information regarding copyright ownership. Crate licenses | ||
# this file to you under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. You may | ||
# obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
# However, if you have executed another commercial license agreement | ||
# with Crate these terms will supersede the license and you may use the | ||
# software solely pursuant to the terms of the relevant commercial agreement. | ||
|
||
from __future__ import absolute_import | ||
|
||
import json | ||
import os | ||
import socket | ||
import unittest | ||
from pprint import pprint | ||
from http.server import HTTPServer, BaseHTTPRequestHandler | ||
import ssl | ||
import time | ||
import threading | ||
import logging | ||
|
||
import stopit | ||
|
||
from crate.testing.layer import CrateLayer | ||
from crate.testing.settings import \ | ||
crate_host, crate_path, crate_port, \ | ||
crate_transport_port, docs_path, localhost | ||
from crate.client import connect | ||
|
||
|
||
makeSuite = unittest.TestLoader().loadTestsFromTestCase | ||
|
||
log = logging.getLogger('crate.testing.layer') | ||
ch = logging.StreamHandler() | ||
ch.setLevel(logging.ERROR) | ||
log.addHandler(ch) | ||
|
||
|
||
def cprint(s): | ||
if isinstance(s, bytes): | ||
s = s.decode('utf-8') | ||
print(s) | ||
|
||
|
||
settings = { | ||
'udc.enabled': 'false', | ||
'lang.js.enabled': 'true', | ||
'auth.host_based.enabled': 'true', | ||
'auth.host_based.config.0.user': 'crate', | ||
'auth.host_based.config.0.method': 'trust', | ||
'auth.host_based.config.98.user': 'trusted_me', | ||
'auth.host_based.config.98.method': 'trust', | ||
'auth.host_based.config.99.user': 'me', | ||
'auth.host_based.config.99.method': 'password', | ||
} | ||
crate_layer = None | ||
|
||
|
||
def ensure_cratedb_layer(): | ||
""" | ||
In order to skip individual tests by manually disabling them within | ||
`def test_suite()`, it is crucial make the test layer not run on each | ||
and every occasion. So, things like this will be possible:: | ||
./bin/test -vvvv --ignore_dir=testing | ||
TODO: Through a subsequent patch, the possibility to individually | ||
unselect specific tests might be added to `def test_suite()` | ||
on behalf of environment variables. | ||
A blueprint for this kind of logic can be found at | ||
https://github.com/crate/crate/commit/414cd833. | ||
""" | ||
global crate_layer | ||
|
||
if crate_layer is None: | ||
crate_layer = CrateLayer('crate', | ||
crate_home=crate_path(), | ||
port=crate_port, | ||
host=localhost, | ||
transport_port=crate_transport_port, | ||
settings=settings) | ||
return crate_layer | ||
|
||
|
||
def setUpCrateLayerBaseline(test): | ||
if hasattr(test, "globs"): | ||
test.globs['crate_host'] = crate_host | ||
test.globs['pprint'] = pprint | ||
test.globs['print'] = cprint | ||
|
||
with connect(crate_host) as conn: | ||
cursor = conn.cursor() | ||
|
||
with open(docs_path('testing/testdata/mappings/locations.sql')) as s: | ||
stmt = s.read() | ||
cursor.execute(stmt) | ||
stmt = ("select count(*) from information_schema.tables " | ||
"where table_name = 'locations'") | ||
cursor.execute(stmt) | ||
assert cursor.fetchall()[0][0] == 1 | ||
|
||
data_path = docs_path('testing/testdata/data/test_a.json') | ||
# load testing data into crate | ||
cursor.execute("copy locations from ?", (data_path,)) | ||
# refresh location table so imported data is visible immediately | ||
cursor.execute("refresh table locations") | ||
# create blob table | ||
cursor.execute("create blob table myfiles clustered into 1 shards " + | ||
"with (number_of_replicas=0)") | ||
|
||
# create users | ||
cursor.execute("CREATE USER me WITH (password = 'my_secret_pw')") | ||
cursor.execute("CREATE USER trusted_me") | ||
|
||
cursor.close() | ||
|
||
|
||
def tearDownDropEntitiesBaseline(test): | ||
""" | ||
Drop all tables, views, and users created by `setUpWithCrateLayer*`. | ||
""" | ||
ddl_statements = [ | ||
"DROP TABLE foobar", | ||
"DROP TABLE locations", | ||
"DROP BLOB TABLE myfiles", | ||
"DROP USER me", | ||
"DROP USER trusted_me", | ||
] | ||
_execute_statements(ddl_statements) | ||
|
||
|
||
class HttpsTestServerLayer: | ||
PORT = 65534 | ||
HOST = "localhost" | ||
CERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), | ||
"pki/server_valid.pem")) | ||
CACERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), | ||
"pki/cacert_valid.pem")) | ||
|
||
__name__ = "httpsserver" | ||
__bases__ = tuple() | ||
|
||
class HttpsServer(HTTPServer): | ||
def get_request(self): | ||
|
||
# Prepare SSL context. | ||
context = ssl._create_unverified_context( | ||
protocol=ssl.PROTOCOL_TLS_SERVER, | ||
cert_reqs=ssl.CERT_OPTIONAL, | ||
check_hostname=False, | ||
purpose=ssl.Purpose.CLIENT_AUTH, | ||
certfile=HttpsTestServerLayer.CERT_FILE, | ||
keyfile=HttpsTestServerLayer.CERT_FILE, | ||
cafile=HttpsTestServerLayer.CACERT_FILE) | ||
|
||
# Set minimum protocol version, TLSv1 and TLSv1.1 are unsafe. | ||
context.minimum_version = ssl.TLSVersion.TLSv1_2 | ||
|
||
# Wrap TLS encryption around socket. | ||
socket, client_address = HTTPServer.get_request(self) | ||
socket = context.wrap_socket(socket, server_side=True) | ||
|
||
return socket, client_address | ||
|
||
class HttpsHandler(BaseHTTPRequestHandler): | ||
|
||
payload = json.dumps({"name": "test", "status": 200, }) | ||
|
||
def do_GET(self): | ||
self.send_response(200) | ||
payload = self.payload.encode('UTF-8') | ||
self.send_header("Content-Length", len(payload)) | ||
self.send_header("Content-Type", "application/json; charset=UTF-8") | ||
self.end_headers() | ||
self.wfile.write(payload) | ||
|
||
def setUp(self): | ||
self.server = self.HttpsServer( | ||
(self.HOST, self.PORT), | ||
self.HttpsHandler | ||
) | ||
thread = threading.Thread(target=self.serve_forever) | ||
thread.daemon = True # quit interpreter when only thread exists | ||
thread.start() | ||
self.waitForServer() | ||
|
||
def serve_forever(self): | ||
print("listening on", self.HOST, self.PORT) | ||
self.server.serve_forever() | ||
print("server stopped.") | ||
|
||
def tearDown(self): | ||
self.server.shutdown() | ||
self.server.server_close() | ||
|
||
def isUp(self): | ||
""" | ||
Test if a host is up. | ||
""" | ||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
ex = s.connect_ex((self.HOST, self.PORT)) | ||
s.close() | ||
return ex == 0 | ||
|
||
def waitForServer(self, timeout=5): | ||
""" | ||
Wait for the host to be available. | ||
""" | ||
with stopit.ThreadingTimeout(timeout) as to_ctx_mgr: | ||
while True: | ||
if self.isUp(): | ||
break | ||
time.sleep(0.001) | ||
|
||
if not to_ctx_mgr: | ||
raise TimeoutError("Could not properly start embedded webserver " | ||
"within {} seconds".format(timeout)) | ||
|
||
|
||
def setUpWithHttps(test): | ||
test.globs['crate_host'] = "https://{0}:{1}".format( | ||
HttpsTestServerLayer.HOST, HttpsTestServerLayer.PORT | ||
) | ||
test.globs['pprint'] = pprint | ||
test.globs['print'] = cprint | ||
|
||
test.globs['cacert_valid'] = os.path.abspath( | ||
os.path.join(os.path.dirname(__file__), "pki/cacert_valid.pem") | ||
) | ||
test.globs['cacert_invalid'] = os.path.abspath( | ||
os.path.join(os.path.dirname(__file__), "pki/cacert_invalid.pem") | ||
) | ||
test.globs['clientcert_valid'] = os.path.abspath( | ||
os.path.join(os.path.dirname(__file__), "pki/client_valid.pem") | ||
) | ||
test.globs['clientcert_invalid'] = os.path.abspath( | ||
os.path.join(os.path.dirname(__file__), "pki/client_invalid.pem") | ||
) | ||
|
||
|
||
def _execute_statements(statements, on_error="ignore"): | ||
with connect(crate_host) as conn: | ||
cursor = conn.cursor() | ||
for stmt in statements: | ||
_execute_statement(cursor, stmt, on_error=on_error) | ||
cursor.close() | ||
|
||
|
||
def _execute_statement(cursor, stmt, on_error="ignore"): | ||
try: | ||
cursor.execute(stmt) | ||
except Exception: # pragma: no cover | ||
# FIXME: Why does this croak on statements like ``DROP TABLE cities``? | ||
# Note: When needing to debug the test environment, you may want to | ||
# enable this logger statement. | ||
# log.exception("Executing SQL statement failed") | ||
if on_error == "ignore": | ||
pass | ||
elif on_error == "raise": | ||
raise | ||
Oops, something went wrong.