diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index d42b8866c..1770f2fbb 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -518,6 +518,7 @@ public void setupDebeziumEventCapture(Properties props, DebeziumRecordParserServ try { DebeziumEngine.Builder> changeEventBuilder = DebeziumEngine.create(Connect.class); changeEventBuilder.using(props); + final boolean[] initialBootstrap = {true}; changeEventBuilder.notifying(new DebeziumEngine.ChangeConsumer>() { @Override public void handleBatch(List> list, @@ -535,8 +536,10 @@ public void handleBatch(List> list, batch.add(chStruct); } } - // Add sequence number. - addVersion(batch); + // Add sequence number. + addVersion(batch, initialBootstrap[0]); + initialBootstrap[0] = false; + if(batch.size() > 0) { appendToRecords(batch); @@ -728,16 +731,21 @@ private void appendToRecords(List convertedRecords) { } + public static final long SEQUENCE_START = 1000000000; /** * Function to add version to every record. * @param chStructs */ - public static void addVersion(List chStructs) { + public static void addVersion(List chStructs, boolean initialSeed) { // Start the sequence from 1 million and increment for every record // and reset the sequence back to 1 million in the next second long sequenceStartTime = System.currentTimeMillis(); - long sequence = 1000000; + long sequence = SEQUENCE_START; + if(initialSeed) { + // Add 500 million to the sequence + sequence += 500000000; + } for(ClickHouseStruct chStruct: chStructs) { // if the current time is greater than the next second, reset the sequence @@ -747,7 +755,7 @@ public static void addVersion(List chStructs) { long currentTime = System.currentTimeMillis(); long diff = (currentTime - sequenceStartTime) / 1000; if(diff >= 1) { - sequence = 1000000; + sequence = SEQUENCE_START; sequenceStartTime = currentTime; } else { sequence++; diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java index 94e9ad21b..0a6961933 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java @@ -1,31 +1,57 @@ package com.altinity.clickhouse.debezium.embedded.cdc; +import com.altinity.clickhouse.debezium.embedded.AppInjector; +import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLBaseIT; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; import com.altinity.clickhouse.sink.connector.model.DBCredentials; import com.clickhouse.jdbc.ClickHouseConnection; import com.google.common.collect.Maps; +import com.google.inject.Guice; +import com.google.inject.Injector; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; + +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; import static org.junit.Assert.assertTrue; + +import org.apache.log4j.BasicConfigurator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Testcontainers -public class DebeziumChangeEventCaptureIT extends DDLBaseIT { +public class DebeziumChangeEventCaptureIT{ private static final Logger log = LoggerFactory.getLogger(DebeziumChangeEventCaptureIT.class); @Test + @Disabled public void testDeleteOffsetStorageRow2() { //System.out.println("Delete offset"); DebeziumChangeEventCapture dec = new DebeziumChangeEventCapture(); try { - Properties props = getDebeziumProperties(); + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); props.setProperty("name", "altinity_sink_connector"); Map propertiesMap = Maps.newHashMap(Maps.fromProperties(props)); ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(propertiesMap); @@ -55,4 +81,116 @@ public void testDeleteOffsetStorageRow2() { } } + protected MySQLContainer mySqlContainer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_schema_only_column_timezone.sql") + // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") +// .withInitScript("15k_tables_mysql.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(35000); + } + + @Test + @Disabled + @DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly.") + public void testIncrementingSequenceNumbers() throws Exception { + + Injector injector = Guice.createInjector(new AppInjector()); + + Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer); + props.setProperty("snapshot.mode", "schema_only"); + props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true"); + props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true"); + + // Override clickhouse server timezone. + ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); + + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), + injector.getInstance(DDLParserService.class), props, false); + DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() + , new Properties()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + + Thread.sleep(25000); + + // Using MySQL. + // 1. Insert multiple records. + // Get connection to MySQL. + + Connection conn = ITCommon.connectToMySQL(mySqlContainer); + conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute(); + + // Insert multiple rows. + conn.prepareStatement("insert into newtable values('a', 1, 1)").execute(); + conn.prepareStatement("insert into newtable values('b', 2, 2)").execute(); + conn.prepareStatement("insert into newtable values('c', 3, 3)").execute(); + conn.prepareStatement("insert into newtable values('d', 4, 4)").execute(); + + Thread.sleep(10000); + + // Create connection to ClickHouse and get the version numbers. + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + + long version1 = 1L; + long version2 = 1L; + long version3 = 1L; + long version4 = 1L; + + ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'a'"); + while(version1Result.next()) { + version1 = version1Result.getLong("_version"); + } + + ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'b'"); + while(version2Result.next()) { + version2 = version2Result.getLong("_version"); + } + + ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'c'"); + while(version3Result.next()) { + version3 = version3Result.getLong("_version"); + } + + ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'd'"); + while(version4Result.next()) { + version4 = version4Result.getLong("_version"); + } + + // Check if version 4 is greater than version 3 + assertTrue(version4 > version3); + // Check if version 3 is greater than version 2 + assertTrue(version3 > version2); + // Check if version 2 is greater than version 1 + assertTrue(version2 > version1); + } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java index 5dce65537..c16336b32 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java @@ -88,12 +88,12 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt // Make a list of ch1, ch2, ch3 and ch4 List clickHouseStructs = Arrays.asList(ch1, ch2, ch3, ch4, ch5); - DebeziumChangeEventCapture.addVersion(clickHouseStructs); + DebeziumChangeEventCapture.addVersion(clickHouseStructs, true); Thread.sleep(1000); // Add ch5 and ch6 List clickHouseStructs2 = Arrays.asList(ch5, ch6); - DebeziumChangeEventCapture.addVersion(clickHouseStructs2); + DebeziumChangeEventCapture.addVersion(clickHouseStructs2, false); // Check if the sequence numbers are unique assertTrue(clickHouseStructs.get(0).getSequenceNumber() != clickHouseStructs.get(1).getSequenceNumber()); @@ -105,7 +105,7 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt assertTrue(clickHouseStructs2.get(0).getSequenceNumber() != clickHouseStructs2.get(1).getSequenceNumber()); // Reset works. - assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001); + // assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001); // DebeziumChangeEventCapture.addVersion(); }