diff --git a/sink-connector-lightweight/clickhouse/config.xml b/sink-connector-lightweight/clickhouse/config.xml
index 8bb196cac..a94e439cd 100644
--- a/sink-connector-lightweight/clickhouse/config.xml
+++ b/sink-connector-lightweight/clickhouse/config.xml
@@ -1,3 +1,14 @@
America/Chicago
+
+
+ zookeeper
+ 2181
+
+ 15000
+
+
+ clickhouse
+ 02
+
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
index 2d9c09887..75bfdd4b9 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
@@ -97,13 +97,15 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
- try {
- String clickHouseVersion = writer.getClickHouseVersion();
- isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
- .checkIfNewReplacingMergeTree(clickHouseVersion);
- } catch (Exception e) {
- log.error("Error retrieving version");
- }
+
+ }
+
+ try {
+ String clickHouseVersion = writer.getClickHouseVersion();
+ isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
+ .checkIfNewReplacingMergeTree(clickHouseVersion);
+ } catch (Exception e) {
+ log.error("Error retrieving version");
}
StringBuffer clickHouseQuery = new StringBuffer();
AtomicBoolean isDropOrTruncate = new AtomicBoolean(false);
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
index f12f75a19..0b5e1215d 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
@@ -103,6 +103,11 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
if(columnNames.contains(isDeletedColumn)) {
isDeletedColumn = "__" + IS_DELETED_COLUMN;
}
+
+ // Check if the destination is ReplicatedReplacingMergeTree.
+ boolean isReplicatedReplacingMergeTree = config.getBoolean(ClickHouseSinkConnectorConfigVariables
+ .AUTO_CREATE_TABLES_REPLICATED.toString());
+
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
@@ -113,10 +118,16 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
- this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
- } else
- this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
-
+ if(isReplicatedReplacingMergeTree == true) {
+ this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
+ } else
+ this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
+ } else {
+ if (isReplicatedReplacingMergeTree == true) {
+ this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
+ } else
+ this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
+ }
if(partitionByColumn.length() > 0) {
this.query.append(Constants.PARTITION_BY).append(" ").append(partitionByColumn);
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
new file mode 100644
index 000000000..dbce8a0e1
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
@@ -0,0 +1,143 @@
+package com.altinity.clickhouse.debezium.embedded;
+
+import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Testcontainers
+@DisplayName("Integration test that validates auto creation of Replicated RMT when the flag is set in config(auto.create.replicated.tables)")
+public class ReplicatedRMTClickHouse22TIT {
+ protected MySQLContainer mySqlContainer;
+ static ClickHouseContainer clickHouseContainer;
+
+ static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
+ .withExposedPorts(2181).withAccessToHost(true);
+
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+
+ Network network = Network.newNetwork();
+ zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
+ zookeeperContainer.start();
+
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("data_types_test.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ // clickHouseContainer.start();
+ Thread.sleep(15000);
+
+ clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:22.3")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_it.sql")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
+ .withExposedPorts(8123)
+ .waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
+ clickHouseContainer.withNetwork(network);
+ clickHouseContainer.start();
+ }
+
+
+ @ParameterizedTest
+ @CsvSource({
+ "clickhouse/clickhouse-server:22.3"
+ })
+ @DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
+ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {
+
+ AtomicReference engine = new AtomicReference<>();
+
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "false");
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+
+ engine.set(new DebeziumChangeEventCapture());
+ engine.get().setup(props, new SourceRecordParserService(),
+ new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ Thread.sleep(30000);
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
+ ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);
+
+ ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");
+ // Validate that all the tables are created.
+ boolean resultValidated = false;
+ while(rs.next()) {
+ resultValidated = true;
+ String createTableDML = rs.getString(1);
+ System.out.println(createTableDML);
+ assert(createTableDML.contains("ReplicatedReplacingMergeTree"));
+ }
+
+ Assert.assertTrue(resultValidated);
+
+ boolean dataValidated = false;
+ // Validate temporal_types_DATETIME data.
+ ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from string_types_MEDIUMTEXT_utf8mb4");
+
+ while(dateTimeResult.next()) {
+ dataValidated = true;
+ System.out.println(dateTimeResult.getString("Type").toString());
+ System.out.println(dateTimeResult.getString("Value").toString());
+
+ Assert.assertTrue(dateTimeResult.getString("Type").toString().equalsIgnoreCase("mediumtext"));
+ Assert.assertTrue(dateTimeResult.getString("Value").toString().equalsIgnoreCase("????"));
+ }
+ Assert.assertTrue(dataValidated);
+
+ if(engine.get() != null) {
+ engine.get().stop();
+ }
+ // Files.deleteIfExists(tmpFilePath);
+ executorService.shutdown();
+ }
+
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java
new file mode 100644
index 000000000..7f6cfb7e5
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java
@@ -0,0 +1,145 @@
+package com.altinity.clickhouse.debezium.embedded;
+
+import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Testcontainers
+@DisplayName("Integration test that validates auto creation of Replicated RMT when the flag is set in config(auto.create.replicated.tables)")
+public class ReplicatedRMTDDLClickHouse22TIT {
+ protected MySQLContainer mySqlContainer;
+ static ClickHouseContainer clickHouseContainer;
+
+ static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
+ .withExposedPorts(2181).withAccessToHost(true);
+
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+
+ Network network = Network.newNetwork();
+ zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
+ zookeeperContainer.start();
+
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("data_types_test.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ // clickHouseContainer.start();
+ Thread.sleep(15000);
+
+ clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:22.3")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_it.sql")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
+ .withExposedPorts(8123)
+ .waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
+ clickHouseContainer.withNetwork(network);
+ clickHouseContainer.start();
+ }
+
+
+ @ParameterizedTest
+ @CsvSource({
+ "clickhouse/clickhouse-server:22.3"
+ })
+ @DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
+ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {
+
+ AtomicReference engine = new AtomicReference<>();
+
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "true");
+ props.setProperty(SinkConnectorLightWeightConfig.DISABLE_DDL, "true");
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+
+ engine.set(new DebeziumChangeEventCapture());
+ engine.get().setup(props, new SourceRecordParserService(),
+ new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ Thread.sleep(30000);
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
+ ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);
+
+ ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");
+ // Validate that all the tables are created.
+ boolean resultValidated = false;
+ while(rs.next()) {
+ resultValidated = true;
+ String createTableDML = rs.getString(1);
+ System.out.println(createTableDML);
+ assert(createTableDML.contains("ReplicatedReplacingMergeTree"));
+ }
+
+ Assert.assertTrue(resultValidated);
+
+ boolean dataValidated = false;
+ // Validate temporal_types_DATETIME data.
+ ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from string_types_MEDIUMTEXT_utf8mb4");
+
+ while(dateTimeResult.next()) {
+ dataValidated = true;
+ System.out.println(dateTimeResult.getString("Type").toString());
+ System.out.println(dateTimeResult.getString("Value").toString());
+
+ Assert.assertTrue(dateTimeResult.getString("Type").toString().equalsIgnoreCase("mediumtext"));
+ Assert.assertTrue(dateTimeResult.getString("Value").toString().equalsIgnoreCase("????"));
+ }
+ Assert.assertTrue(dataValidated);
+
+ if(engine.get() != null) {
+ engine.get().stop();
+ }
+ // Files.deleteIfExists(tmpFilePath);
+ executorService.shutdown();
+ }
+
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java
new file mode 100644
index 000000000..51d524904
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java
@@ -0,0 +1,148 @@
+package com.altinity.clickhouse.debezium.embedded;
+
+import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Testcontainers
+@DisplayName("Integration test that validates auto creation of Replicated RMT when the flag is set in config(auto.create.replicated.tables)")
+public class ReplicatedRMTDDLIT {
+ protected MySQLContainer mySqlContainer;
+ static ClickHouseContainer clickHouseContainer;
+
+ static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
+ .withExposedPorts(2181).withAccessToHost(true);
+
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+
+ Network network = Network.newNetwork();
+ zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
+ zookeeperContainer.start();
+
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("data_types_test.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ // clickHouseContainer.start();
+ Thread.sleep(15000);
+
+ clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_it.sql")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
+ .withExposedPorts(8123)
+ .waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
+ clickHouseContainer.withNetwork(network);
+ clickHouseContainer.start();
+ }
+
+ static {
+
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "clickhouse/clickhouse-server:latest"
+ })
+ @DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
+ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {
+
+ AtomicReference engine = new AtomicReference<>();
+
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "true");
+ props.setProperty(SinkConnectorLightWeightConfig.DISABLE_DDL, "true");
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+
+ engine.set(new DebeziumChangeEventCapture());
+ engine.get().setup(props, new SourceRecordParserService(),
+ new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ Thread.sleep(30000);
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
+ ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);
+
+ ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");
+ // Validate that all the tables are created.
+ boolean resultValidated = false;
+ while(rs.next()) {
+ resultValidated = true;
+ String createTableDML = rs.getString(1);
+ System.out.println(createTableDML);
+ assert(createTableDML.contains("ReplicatedReplacingMergeTree"));
+ }
+
+ Assert.assertTrue(resultValidated);
+
+ boolean dataValidated = false;
+ // Validate temporal_types_DATETIME data.
+ ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from string_types_MEDIUMTEXT_utf8mb4");
+
+ while(dateTimeResult.next()) {
+ dataValidated = true;
+ System.out.println(dateTimeResult.getString("Type").toString());
+ System.out.println(dateTimeResult.getString("Value").toString());
+
+ Assert.assertTrue(dateTimeResult.getString("Type").toString().equalsIgnoreCase("mediumtext"));
+ Assert.assertTrue(dateTimeResult.getString("Value").toString().equalsIgnoreCase("????"));
+ }
+ Assert.assertTrue(dataValidated);
+
+ if(engine.get() != null) {
+ engine.get().stop();
+ }
+ // Files.deleteIfExists(tmpFilePath);
+ executorService.shutdown();
+ }
+
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java
new file mode 100644
index 000000000..e0fe862c0
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java
@@ -0,0 +1,147 @@
+package com.altinity.clickhouse.debezium.embedded;
+
+import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Testcontainers
+@DisplayName("Integration test that validates auto creation of Replicated RMT when the flag is set in config(auto.create.replicated.tables)")
+public class ReplicatedRMTIT {
+ protected MySQLContainer mySqlContainer;
+ static ClickHouseContainer clickHouseContainer;
+
+ static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
+ .withExposedPorts(2181).withAccessToHost(true);
+
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+
+ Network network = Network.newNetwork();
+ zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
+ zookeeperContainer.start();
+
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("data_types_test.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ // clickHouseContainer.start();
+ Thread.sleep(15000);
+
+ clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_it.sql")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
+ .withExposedPorts(8123)
+ .waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
+ clickHouseContainer.withNetwork(network);
+ clickHouseContainer.start();
+ }
+
+ static {
+
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "clickhouse/clickhouse-server:latest"
+ })
+ @DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
+ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {
+
+ AtomicReference engine = new AtomicReference<>();
+
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "false");
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+
+ engine.set(new DebeziumChangeEventCapture());
+ engine.get().setup(props, new SourceRecordParserService(),
+ new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ Thread.sleep(30000);
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
+ ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);
+
+ ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");
+ // Validate that all the tables are created.
+ boolean resultValidated = false;
+ while(rs.next()) {
+ resultValidated = true;
+ String createTableDML = rs.getString(1);
+ System.out.println(createTableDML);
+ assert(createTableDML.contains("ReplicatedReplacingMergeTree"));
+ }
+
+ Assert.assertTrue(resultValidated);
+
+ boolean dataValidated = false;
+ // Validate temporal_types_DATETIME data.
+ ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from string_types_MEDIUMTEXT_utf8mb4");
+
+ while(dateTimeResult.next()) {
+ dataValidated = true;
+ System.out.println(dateTimeResult.getString("Type").toString());
+ System.out.println(dateTimeResult.getString("Value").toString());
+
+ Assert.assertTrue(dateTimeResult.getString("Type").toString().equalsIgnoreCase("mediumtext"));
+ Assert.assertTrue(dateTimeResult.getString("Value").toString().equalsIgnoreCase("????"));
+ }
+ Assert.assertTrue(dataValidated);
+
+ if(engine.get() != null) {
+ engine.get().stop();
+ }
+ // Files.deleteIfExists(tmpFilePath);
+ executorService.shutdown();
+ }
+
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
index e8cd71e93..560784d37 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
@@ -673,6 +673,24 @@ public void checkIfDropOrTruncate(String sql, boolean expectedResult) {
}
+ @Test
+ public void testReplicatedReplacingMergeTreeWithoutIsDeletedColumn() {
+ HashMap configMap = new HashMap();
+ configMap.put(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
+ ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap);
+ MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(config);
+ StringBuffer clickHouseQuery = new StringBuffer();
+ AtomicBoolean isDropOrTruncate = new AtomicBoolean();
+
+ String sql = "CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` timestamp(1) NOT NULL) ENGINE=InnoDB;";
+ mySQLDDLParserService.parseSql(sql, "temporal_types_DATETIME4", clickHouseQuery, isDropOrTruncate);
+
+ String expectedResult = "CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/temporal_types_DATETIME4', '{replica}', _version, is_deleted) ORDER BY tuple()";
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(expectedResult));
+
+
+ }
+
@ParameterizedTest
@CsvSource(
value = {"CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` timestamp(1) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()",
diff --git a/sink-connector-lightweight/src/test/resources/config_replicated.xml b/sink-connector-lightweight/src/test/resources/config_replicated.xml
new file mode 100644
index 000000000..381d1a97d
--- /dev/null
+++ b/sink-connector-lightweight/src/test/resources/config_replicated.xml
@@ -0,0 +1,21 @@
+
+
+
+ users.xml
+
+
+ /var/lib/clickhouse/access/
+
+
+ America/Chicago
+
+
+ zookeeper
+ 2181
+
+
+
+ clickhouse
+ 02
+
+
\ No newline at end of file
diff --git a/sink-connector-lightweight/src/test/resources/macros.xml b/sink-connector-lightweight/src/test/resources/macros.xml
new file mode 100644
index 000000000..75d04d1a3
--- /dev/null
+++ b/sink-connector-lightweight/src/test/resources/macros.xml
@@ -0,0 +1,6 @@
+
+
+ clickhouse
+ 02
+
+
\ No newline at end of file
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
index 66873d182..693f4f9db 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
@@ -283,6 +283,17 @@ static ConfigDef newConfigDef() {
1,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString())
+ .define(
+ ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(),
+ Type.BOOLEAN,
+ false,
+ Importance.HIGH,
+ "If enabled, ReplicatedReplacingMergeTree tables are created in ClickHouse",
+ CONFIG_GROUP_CONNECTOR_CONFIG,
+ 1,
+ ConfigDef.Width.NONE,
+ ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString())
+
.define(
ClickHouseSinkConnectorConfigVariables.ENABLE_SCHEMA_EVOLUTION.toString(),
Type.BOOLEAN,
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
index 081545bf2..bdd6224f4 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
@@ -53,6 +53,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
//Config variable for auto creating tables if they dont exist.
AUTO_CREATE_TABLES("auto.create.tables"),
+ // Config variable for auto creating ReplicatedReplacingMergeTree
+ AUTO_CREATE_TABLES_REPLICATED("auto.create.tables.replicated"),
+
// Config variable when set to true, columns will be added.
ENABLE_SCHEMA_EVOLUTION("schema.evolution"),
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java
index 0118a371f..b62dc5ce9 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java
@@ -170,8 +170,10 @@ public String getVersionColumnForReplacingMergeTree(String createDML) {
String parameters = StringUtils.substringBetween(createDML, REPLICATED_REPLACING_MERGE_TREE_VER_PREFIX, ")");
if(parameters != null) {
String[] parameterArray = parameters.split(",");
- if(parameterArray != null && parameterArray.length >= 3) {
+ if(parameterArray != null && parameterArray.length == 3) {
versionColumn = parameterArray[2].trim();
+ } else if(parameterArray != null && parameterArray.length == 4) {
+ versionColumn = parameterArray[2].trim() + "," + parameterArray[3].trim();
}
}
}
@@ -229,12 +231,17 @@ public MutablePair getEngineFromResponse(String response)
if(response.contains(TABLE_ENGINE.COLLAPSING_MERGE_TREE.engine)) {
result.left = TABLE_ENGINE.COLLAPSING_MERGE_TREE;
result.right = getSignColumnForCollapsingMergeTree(response);
- } else if(response.contains(TABLE_ENGINE.REPLACING_MERGE_TREE.engine)) {
+ }
+ else if(response.contains(TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE.engine)) {
+ result.left = TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE;
+ result.right = getVersionColumnForReplacingMergeTree(response);
+ }
+ else if(response.contains(TABLE_ENGINE.REPLACING_MERGE_TREE.engine)) {
result.left = TABLE_ENGINE.REPLACING_MERGE_TREE;
result.right = getVersionColumnForReplacingMergeTree(response);
} else if(response.contains(TABLE_ENGINE.MERGE_TREE.engine)) {
result.left = TABLE_ENGINE.MERGE_TREE;
- } else {
+ } else {
result.left = TABLE_ENGINE.DEFAULT;
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
index 5e62d523a..88363167b 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
@@ -101,7 +101,14 @@ public DbWriter(
this.engine = response.getLeft();
long taskId = this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
-
+ boolean isNewReplacingMergeTreeEngine = false;
+ try {
+ String clickHouseVersion = this.getClickHouseVersion();
+ isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
+ .checkIfNewReplacingMergeTree(clickHouseVersion);
+ } catch (Exception e) {
+ log.error("Error retrieving ClickHouse version");
+ }
//ToDO: Is this a reliable way of checking if the table exists already.
if (this.engine == null) {
if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString())) {
@@ -114,8 +121,10 @@ public DbWriter(
} else if(record.getBeforeStruct() != null) {
fields = record.getBeforeStruct().schema().fields().toArray(new Field[0]);
}
-
- act.createNewTable(record.getPrimaryKey(), tableName, fields, this.conn);
+ boolean useReplicatedReplacingMergeTree = this.config.getBoolean(
+ ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString());
+ act.createNewTable(record.getPrimaryKey(), tableName, fields, this.conn,
+ isNewReplacingMergeTreeEngine, useReplicatedReplacingMergeTree);
} catch (Exception e) {
log.error("**** Error creating table ***" + tableName, e);
}
@@ -128,7 +137,9 @@ public DbWriter(
this.engine = response.getLeft();
}
- if (this.engine != null && this.engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine())) {
+ if (this.engine != null &&
+ (this.engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine()) ||
+ this.engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE.getEngine()))) {
String rmtColumns = response.getRight();
if(rmtColumns != null && rmtColumns.contains(",")) {
// New RMT, with version and deleted column.
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
index c4a31bb7d..6b14e206b 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
@@ -224,7 +224,7 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P
//String colName = entry.getKey();
- //ToDO: Setting null to a non-nullable field
+ //ToDO: Setting null to a non-nullable field)
// will throw an error.
// If the Received column is not a clickhouse column
try {
@@ -301,7 +301,10 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P
// Version column.
//String versionColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_TABLE_VERSION_COLUMN);
- if(engine != null && engine.getEngine() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine() && versionColumn != null) {
+ if(engine != null &&
+ (engine.getEngine() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine() ||
+ engine.getEngine() == DBMetadata.TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE.getEngine())
+ && versionColumn != null) {
if (columnNameToDataTypeMap.containsKey(versionColumn)) {
if(columnNameToIndexMap.containsKey(versionColumn)) {
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
index b8449fa1a..ebda3857b 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
@@ -24,9 +24,12 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{
private static final Logger log = LoggerFactory.getLogger(ClickHouseAutoCreateTable.class.getName());
- public void createNewTable(ArrayList primaryKey, String tableName, Field[] fields, ClickHouseConnection connection) throws SQLException {
+ public void createNewTable(ArrayList primaryKey, String tableName, Field[] fields,
+ ClickHouseConnection connection, boolean isNewReplacingMergeTree,
+ boolean useReplicatedReplacingMergeTree) throws SQLException {
Map colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields);
- String createTableQuery = this.createTableSyntax(primaryKey, tableName, fields, colNameToDataTypeMap);
+ String createTableQuery = this.createTableSyntax(primaryKey, tableName, fields, colNameToDataTypeMap,
+ isNewReplacingMergeTree, useReplicatedReplacingMergeTree);
log.info("**** AUTO CREATE TABLE " + createTableQuery);
// ToDO: need to run it before a session is created.
this.runQuery(createTableQuery, connection);
@@ -39,7 +42,10 @@ public void createNewTable(ArrayList primaryKey, String tableName, Field
* @param columnToDataTypesMap
* @return CREATE TABLE query
*/
- public java.lang.String createTableSyntax(ArrayList primaryKey, String tableName, Field[] fields, Map columnToDataTypesMap) {
+ public java.lang.String createTableSyntax(ArrayList primaryKey, String tableName, Field[] fields,
+ Map columnToDataTypesMap,
+ boolean isNewReplacingMergeTreeEngine,
+ boolean useReplicatedReplacingMergeTree) {
StringBuilder createTableSyntax = new StringBuilder();
@@ -69,18 +75,31 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t
createTableSyntax.append(",");
}
-// for(Map.Entry entry: columnToDataTypesMap.entrySet()) {
-// createTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(",");
-// }
- //createTableSyntax.deleteCharAt(createTableSyntax.lastIndexOf(","));
- // Append sign and version columns
- createTableSyntax.append("`").append(SIGN_COLUMN).append("` ").append(SIGN_COLUMN_DATA_TYPE).append(",");
- createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE);
+ String isDeletedColumn = IS_DELETED_COLUMN;
+ if(isNewReplacingMergeTreeEngine == true) {
+ createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
+ createTableSyntax.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
+ } else {
+ // Append sign and version columns
+ createTableSyntax.append("`").append(SIGN_COLUMN).append("` ").append(SIGN_COLUMN_DATA_TYPE).append(",");
+ createTableSyntax.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE);
+ }
createTableSyntax.append(")");
createTableSyntax.append(" ");
- createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
+
+ if(isNewReplacingMergeTreeEngine == true ){
+ if(useReplicatedReplacingMergeTree == true) {
+ createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
+ } else
+ createTableSyntax.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
+ } else {
+ if(useReplicatedReplacingMergeTree == true) {
+ createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
+ } else
+ createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
+ }
createTableSyntax.append(" ");
if(primaryKey != null && isPrimaryKeyColumnPresent(primaryKey, columnToDataTypesMap)) {
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java
index f7f056e04..0a08f8319 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java
@@ -93,14 +93,25 @@ public void testGetEngineFromResponse() {
Assert.assertTrue(replacingMergeTreeResult.getRight().equalsIgnoreCase("ver"));
Assert.assertTrue(replacingMergeTreeResult.getLeft().getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine()));
+ String replacingMergeTreeWIsDeletedColumn = "ReplacingMergeTree(ver, is_deleted) PRIMARY KEY dept_no ORDER BY dept_no SETTINGS index_granularity = 8192";
+ MutablePair replacingMergeTreeWIsDeletedColumnResult = new DBMetadata().getEngineFromResponse(replacingMergeTreeWIsDeletedColumn);
+
+ Assert.assertTrue(replacingMergeTreeWIsDeletedColumnResult.getRight().equalsIgnoreCase("ver, is_deleted"));
+ Assert.assertTrue(replacingMergeTreeWIsDeletedColumnResult.getLeft().getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine()));
String replicatedReplacingMergeTree = "ReplicatedReplacingMergeTree('/clickhouse/{cluster}/tables/dashboard_mysql_replication/favourite_products', '{replica}', ver) ORDER BY id SETTINGS allow_nullable_key = 1, index_granularity = 8192";
MutablePair replicatedReplacingMergeTreeResult = new DBMetadata().getEngineFromResponse(replicatedReplacingMergeTree);
Assert.assertTrue(replicatedReplacingMergeTreeResult.getRight().equalsIgnoreCase("ver"));
- Assert.assertTrue(replicatedReplacingMergeTreeResult.getLeft().getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine()));
+ Assert.assertTrue(replicatedReplacingMergeTreeResult.getLeft().getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE.getEngine()));
+
+
+ String replicatedReplacingMergeTreeWIsDeletedColumn = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/temporal_types_DATETIME4', '{replica}', _version, is_deleted) ORDER BY tuple()";
+ MutablePair replicatedReplacingMergeTreeWIsDeletedColumnResult = new DBMetadata().getEngineFromResponse(replicatedReplacingMergeTreeWIsDeletedColumn);
+ Assert.assertTrue(replicatedReplacingMergeTreeWIsDeletedColumnResult.getRight().equalsIgnoreCase("_version,is_deleted"));
+ Assert.assertTrue(replicatedReplacingMergeTreeWIsDeletedColumnResult.getLeft().getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLICATED_REPLACING_MERGE_TREE.getEngine()));
}
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java
index 430e95f37..09d4fa70b 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java
@@ -124,7 +124,8 @@ public void testCreateTableSyntax() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
- String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
+ String query = act.createTableSyntax(primaryKeys, "auto_create_table",
+ createFields(), this.columnToDataTypesMap, false, false);
System.out.println("QUERY" + query);
Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE `auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY(customerName)"));
//Assert.assertTrue(query.equalsIgnoreCase("CREATE TABLE auto_create_table(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) PRIMARY KEY(customerName) ORDER BY (customerName)"));
@@ -135,7 +136,8 @@ public void testCreateTableEmptyPrimaryKey() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
- String query = act.createTableSyntax(null, "auto_create_table", createFields(), this.columnToDataTypesMap);
+ String query = act.createTableSyntax(null, "auto_create_table", createFields(),
+ this.columnToDataTypesMap, false, false);
String expectedQuery = "CREATE TABLE `auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
@@ -148,7 +150,8 @@ public void testCreateTableMultiplePrimaryKeys() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
- String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
+ String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(),
+ this.columnToDataTypesMap, false, false);
String expectedQuery = "CREATE TABLE `auto_create_table`(`customerName` String NOT NULL,`occupation` String NOT NULL,`quantity` Int32 NOT NULL,`amount_1` Float32 NOT NULL,`amount` Float64 NOT NULL,`employed` Bool NOT NULL,`blob_storage` String NOT NULL,`blob_storage_scale` Decimal NOT NULL,`json_output` JSON,`max_amount` Float64 NOT NULL,`_sign` Int8,`_version` UInt64) ENGINE = ReplacingMergeTree(_version) ORDER BY tuple()";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
@@ -177,7 +180,8 @@ public void testCreateNewTable() {
primaryKeys.add("customerName");
try {
- act.createNewTable(primaryKeys, "auto_create_table", this.createFields(), writer.getConnection());
+ act.createNewTable(primaryKeys, "auto_create_table", this.createFields(), writer.getConnection(),
+ false, false);
} catch(SQLException se) {
Assert.assertTrue(false);
}