Skip to content

Commit

Permalink
Merge pull request #827 from Altinity/801-records-are-not-acknowledge…
Browse files Browse the repository at this point in the history
…d-or-the-offsets-are-not-updated-in-singlethreaded-mode

801 records are not acknowledged or the offsets are not updated in singlethreaded mode
  • Loading branch information
subkanthi authored Sep 18, 2024
2 parents 09d31cc + a603892 commit 7203f57
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Integration test to validate support for replication of multiple databases.
*/
@Testcontainers
@DisplayName("Integration Test that validates basic replication of MariaDB databases")
@DisplayName("Integration Test that validates basic replication of MariaDB databases in single threaded mode")
public class MariaDBIT
{

Expand Down Expand Up @@ -66,7 +66,7 @@ public void startContainers() throws InterruptedException {
clickHouseContainer.start();
}

@DisplayName("Integration Test that validates handle of JSON data type from MySQL")
@DisplayName("Integration Test that validates replication of MariaDB databases in single.threaded mode")
@Test
public void testMultipleDatabases() throws Exception {

Expand All @@ -77,6 +77,7 @@ public void testMultipleDatabases() throws Exception {
// Set the list of databases captured.
props.put("database.whitelist", "employees,test_db,test_db2");
props.put("database.include.list", "employees,test_db,test_db2");
props.put("single.threaded", true);

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ public void persistRecords(List<ClickHouseStruct> records) {
//throw new RuntimeException(e);
log.error("Error marking records as processed"+ e);
}

if(record.isLastRecordInBatch()) {
try {
record.getCommitter().markBatchFinished();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
} catch(Exception e) {
Expand Down

0 comments on commit 7203f57

Please sign in to comment.