diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java index 8e1aa0889..ce2f163e5 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java @@ -161,6 +161,14 @@ public void persistRecords(List records) { //throw new RuntimeException(e); log.error("Error marking records as processed"+ e); } + + if(record.isLastRecordInBatch()) { + try { + record.getCommitter().markBatchFinished(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } }); } } catch(Exception e) {