diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java index aec3c4bff..6a575ab3f 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java @@ -31,7 +31,7 @@ * Integration test to validate support for replication of multiple databases. */ @Testcontainers -@DisplayName("Integration Test that validates basic replication of MariaDB databases") +@DisplayName("Integration Test that validates basic replication of MariaDB databases in single threaded mode") public class MariaDBIT { @@ -66,7 +66,7 @@ public void startContainers() throws InterruptedException { clickHouseContainer.start(); } - @DisplayName("Integration Test that validates handle of JSON data type from MySQL") + @DisplayName("Integration Test that validates replication of MariaDB databases in single.threaded mode") @Test public void testMultipleDatabases() throws Exception { @@ -77,6 +77,7 @@ public void testMultipleDatabases() throws Exception { // Set the list of databases captured. props.put("database.whitelist", "employees,test_db,test_db2"); props.put("database.include.list", "employees,test_db,test_db2"); + props.put("single.threaded", true); ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java index 8e1aa0889..ce2f163e5 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java @@ -161,6 +161,14 @@ public void persistRecords(List records) { //throw new RuntimeException(e); log.error("Error marking records as processed"+ e); } + + if(record.isLastRecordInBatch()) { + try { + record.getCommitter().markBatchFinished(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } }); } } catch(Exception e) {