Skip to content

Commit

Permalink
IO: Improve BulkProcessor when running per-record operations
Browse files Browse the repository at this point in the history
... by also checking `rowcount` for handling `INSERT OK, 0 rows`
responses.
  • Loading branch information
amotl committed Oct 9, 2024
1 parent a9c70a3 commit e76a173
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


## Unreleased
- IO: Improved `BulkProcessor` when running per-record operations by
also checking `rowcount` for handling `INSERT OK, 0 rows` responses

## 2024/10/01 v0.0.27
- MongoDB: Updated to pymongo 4.9
Expand Down
23 changes: 15 additions & 8 deletions cratedb_toolkit/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
if status["rowcount"] != 1:
errors.append(record)
return errors

Expand Down Expand Up @@ -143,12 +143,17 @@ def start(self) -> BulkMetrics:
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)
if cursor.rowcount > 0:
cratedb_bulk_result = getattr(cursor.context, "last_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)
else:
failed_records = operation.parameters
count_success_local = 0
self.progress_bar and self.progress_bar.update(n=1)

# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
Expand All @@ -165,8 +170,10 @@ def start(self) -> BulkMetrics:
)
for record in failed_records:
try:
self.connection.execute(statement=statement, parameters=record)
cursor = self.connection.execute(statement=statement, parameters=record)
self.connection.commit()
if cursor.rowcount != 1:
raise IOError("Record has not been processed")
self._metrics.count_success_total += 1
except Exception as ex:
logger.error(f"Operation failed: {ex}")
Expand Down
3 changes: 0 additions & 3 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

Expand All @@ -30,8 +29,6 @@ def __init__(
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname
Expand Down
3 changes: 0 additions & 3 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand All @@ -33,8 +32,6 @@ def __init__(
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down
Empty file.
19 changes: 0 additions & 19 deletions cratedb_toolkit/sqlalchemy/patch.py

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies = [
"python-dotenv<2",
"python-slugify<9",
"pyyaml<7",
"sqlalchemy-cratedb>=0.37,<1",
"sqlalchemy-cratedb>=0.40,<1",
"sqlparse<0.6",
"tqdm<5",
'typing-extensions<5; python_version <= "3.7"',
Expand Down
44 changes: 43 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def test_mongodb_copy_filesystem_bson(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_http_json_relaxed(caplog, cratedb):
def test_mongodb_copy_http_json_relaxed_books(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP.
"""
Expand Down Expand Up @@ -355,3 +355,45 @@ def test_mongodb_copy_http_json_relaxed(caplog, cratedb):
"Charlie Collins",
"Robi Sen",
]


def test_mongodb_copy_http_json_relaxed_products(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP.
`datasets/products.json` includes one invalid record.
"""

# Define source and target URLs.
json_resource = "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/products.json"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
jqlang_transformation = """
.[] |= (
select(true)
| if (.for) then .for |= to_array end
| if (.type) then .type |= to_array end
| if (.limits.data.n) then .limits.data.n |= tostring end
| if (.limits.sms.n) then .limits.sms.n |= tostring end
| if (.limits.voice.n) then .limits.voice.n |= tostring end
)
"""
transformation = TransformationProject().add(
CollectionTransformation(
address=CollectionAddress(container="datasets", name="products"),
pre=MokshaTransformation().jq(jqlang_transformation),
)
)
mongodb_copy(json_resource, cratedb_url, transformation=transformation)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 10

# Verify content in target database.
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 'ac3';", records=True)
assert results[0]["data"]["name"] == "AC3 Phone"

assert "Bulk processor metrics: BulkMetrics(count_success_total=10, count_error_total=1" in caplog.text

0 comments on commit e76a173

Please sign in to comment.