Skip to content

Commit

Permalink
Merge pull request #552 from Altinity/523-handle-scenario-when-record…
Browse files Browse the repository at this point in the history
…s-could-be-inserted-with-the-same-timestampnon-gtid-mode

523 handle scenario when records could be inserted with the same timestampnon gtid mode
  • Loading branch information
subkanthi authored Apr 23, 2024
2 parents d5e87e8 + 302ced9 commit e6db8f2
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ public void setupDebeziumEventCapture(Properties props, DebeziumRecordParserServ
try {
DebeziumEngine.Builder<ChangeEvent<SourceRecord, SourceRecord>> changeEventBuilder = DebeziumEngine.create(Connect.class);
changeEventBuilder.using(props);
final boolean[] initialBootstrap = {true};
changeEventBuilder.notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
@Override
public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list,
Expand All @@ -535,8 +536,10 @@ public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list,
batch.add(chStruct);
}
}
// Add sequence number.
addVersion(batch);
// Add sequence number.
addVersion(batch, initialBootstrap[0]);
initialBootstrap[0] = false;


if(batch.size() > 0) {
appendToRecords(batch);
Expand Down Expand Up @@ -728,16 +731,21 @@ private void appendToRecords(List<ClickHouseStruct> convertedRecords) {

}

public static final long SEQUENCE_START = 1000000000;
/**
* Function to add version to every record.
* @param chStructs
*/
public static void addVersion(List<ClickHouseStruct> chStructs) {
public static void addVersion(List<ClickHouseStruct> chStructs, boolean initialSeed) {

// Start the sequence from 1 million and increment for every record
// and reset the sequence back to 1 million in the next second
long sequenceStartTime = System.currentTimeMillis();
long sequence = 1000000;
long sequence = SEQUENCE_START;
if(initialSeed) {
// Add 500 million to the sequence
sequence += 500000000;
}
for(ClickHouseStruct chStruct: chStructs) {

// if the current time is greater than the next second, reset the sequence
Expand All @@ -747,7 +755,7 @@ public static void addVersion(List<ClickHouseStruct> chStructs) {
long currentTime = System.currentTimeMillis();
long diff = (currentTime - sequenceStartTime) / 1000;
if(diff >= 1) {
sequence = 1000000;
sequence = SEQUENCE_START;
sequenceStartTime = currentTime;
} else {
sequence++;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,57 @@
package com.altinity.clickhouse.debezium.embedded.cdc;

import com.altinity.clickhouse.debezium.embedded.AppInjector;
import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication;
import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLBaseIT;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.altinity.clickhouse.sink.connector.model.DBCredentials;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.google.common.collect.Maps;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;

import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties;
import static org.junit.Assert.assertTrue;

import org.apache.log4j.BasicConfigurator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Testcontainers
public class DebeziumChangeEventCaptureIT extends DDLBaseIT {
public class DebeziumChangeEventCaptureIT{

private static final Logger log = LoggerFactory.getLogger(DebeziumChangeEventCaptureIT.class);
@Test
@Disabled
public void testDeleteOffsetStorageRow2() {
//System.out.println("Delete offset");
DebeziumChangeEventCapture dec = new DebeziumChangeEventCapture();
try {
Properties props = getDebeziumProperties();
Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty("name", "altinity_sink_connector");
Map<String, String> propertiesMap = Maps.newHashMap(Maps.fromProperties(props));
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(propertiesMap);
Expand Down Expand Up @@ -55,4 +81,116 @@ public void testDeleteOffsetStorageRow2() {
}
}

protected MySQLContainer mySqlContainer;

@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_schema_only_column_timezone.sql")
// .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);
@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
// .withInitScript("15k_tables_mysql.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
clickHouseContainer.start();
Thread.sleep(35000);
}

@Test
@Disabled
@DisplayName("Test that validates that the sequence number that is created in non-gtid mode is incremented correctly.")
public void testIncrementingSequenceNumbers() throws Exception {

Injector injector = Guice.createInjector(new AppInjector());

Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty("snapshot.mode", "schema_only");
props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true");
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");

// Override clickhouse server timezone.
ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();


ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
throw new RuntimeException(e);
}

});

Thread.sleep(25000);

// Using MySQL.
// 1. Insert multiple records.
// Get connection to MySQL.

Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("create table `newtable`(col1 varchar(255) not null, col2 int, col3 int, primary key(col1))").execute();

// Insert multiple rows.
conn.prepareStatement("insert into newtable values('a', 1, 1)").execute();
conn.prepareStatement("insert into newtable values('b', 2, 2)").execute();
conn.prepareStatement("insert into newtable values('c', 3, 3)").execute();
conn.prepareStatement("insert into newtable values('d', 4, 4)").execute();

Thread.sleep(10000);

// Create connection to ClickHouse and get the version numbers.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);

long version1 = 1L;
long version2 = 1L;
long version3 = 1L;
long version4 = 1L;

ResultSet version1Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'a'");
while(version1Result.next()) {
version1 = version1Result.getLong("_version");
}

ResultSet version2Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'b'");
while(version2Result.next()) {
version2 = version2Result.getLong("_version");
}

ResultSet version3Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'c'");
while(version3Result.next()) {
version3 = version3Result.getLong("_version");
}

ResultSet version4Result = writer.executeQueryWithResultSet("select _version from newtable where col1 = 'd'");
while(version4Result.next()) {
version4 = version4Result.getLong("_version");
}

// Check if version 4 is greater than version 3
assertTrue(version4 > version3);
// Check if version 3 is greater than version 2
assertTrue(version3 > version2);
// Check if version 2 is greater than version 1
assertTrue(version2 > version1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt

// Make a list of ch1, ch2, ch3 and ch4
List<ClickHouseStruct> clickHouseStructs = Arrays.asList(ch1, ch2, ch3, ch4, ch5);
DebeziumChangeEventCapture.addVersion(clickHouseStructs);
DebeziumChangeEventCapture.addVersion(clickHouseStructs, true);

Thread.sleep(1000);
// Add ch5 and ch6
List<ClickHouseStruct> clickHouseStructs2 = Arrays.asList(ch5, ch6);
DebeziumChangeEventCapture.addVersion(clickHouseStructs2);
DebeziumChangeEventCapture.addVersion(clickHouseStructs2, false);

// Check if the sequence numbers are unique
assertTrue(clickHouseStructs.get(0).getSequenceNumber() != clickHouseStructs.get(1).getSequenceNumber());
Expand All @@ -105,7 +105,7 @@ public void shouldAssignUniqueSequenceNumbersWithinSameSecond() throws Interrupt
assertTrue(clickHouseStructs2.get(0).getSequenceNumber() != clickHouseStructs2.get(1).getSequenceNumber());

// Reset works.
assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001);
// assertTrue(clickHouseStructs2.get(0).getSequenceNumber() == 1000001);
// DebeziumChangeEventCapture.addVersion();
}

Expand Down

0 comments on commit e6db8f2

Please sign in to comment.