Skip to content

Commit

Permalink
Merge pull request #492 from Altinity/473-add-replicatedreplacingmerg…
Browse files Browse the repository at this point in the history
…etree-table-engine-to-sink-connector

Added logic to auto create RRMT tables
  • Loading branch information
subkanthi authored Mar 11, 2024
2 parents 12fd0db + 090b3d2 commit 9551549
Show file tree
Hide file tree
Showing 18 changed files with 757 additions and 36 deletions.
11 changes: 11 additions & 0 deletions sink-connector-lightweight/clickhouse/config.xml
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
<clickhouse replace="true">
<timezone>America/Chicago</timezone>
<zookeeper>
<node index="1">
<host>zookeeper</host>
<port>2181</port>
</node>
<session_timeout_ms>15000</session_timeout_ms>
</zookeeper>
<macros>
<replica>clickhouse</replica>
<shard>02</shard>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
try {
String clickHouseVersion = writer.getClickHouseVersion();
isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
.checkIfNewReplacingMergeTree(clickHouseVersion);
} catch (Exception e) {
log.error("Error retrieving version");
}

}

try {
String clickHouseVersion = writer.getClickHouseVersion();
isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
.checkIfNewReplacingMergeTree(clickHouseVersion);
} catch (Exception e) {
log.error("Error retrieving version");
}
StringBuffer clickHouseQuery = new StringBuffer();
AtomicBoolean isDropOrTruncate = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
if(columnNames.contains(isDeletedColumn)) {
isDeletedColumn = "__" + IS_DELETED_COLUMN;
}

// Check if the destination is ReplicatedReplacingMergeTree.
boolean isReplicatedReplacingMergeTree = config.getBoolean(ClickHouseSinkConnectorConfigVariables
.AUTO_CREATE_TABLES_REPLICATED.toString());

if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
Expand All @@ -113,10 +118,16 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr

this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");

if(isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if (isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
if(partitionByColumn.length() > 0) {
this.query.append(Constants.PARTITION_BY).append(" ").append(partitionByColumn);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
@DisplayName("Integration test that validates auto creation of Replicated RMT when the flag is set in config(auto.create.replicated.tables)")
public class ReplicatedRMTClickHouse22TIT {
protected MySQLContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
.withExposedPorts(2181).withAccessToHost(true);

@BeforeEach
public void startContainers() throws InterruptedException {

Network network = Network.newNetwork();
zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
zookeeperContainer.start();

mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("data_types_test.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
// clickHouseContainer.start();
Thread.sleep(15000);

clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:22.3")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
.withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
.withExposedPorts(8123)
.waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
clickHouseContainer.withNetwork(network);
clickHouseContainer.start();
}


@ParameterizedTest
@CsvSource({
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "false");


ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});


Thread.sleep(30000);
Connection conn = ITCommon.connectToMySQL(mySqlContainer);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);

ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");

Check failure on line 110 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

ReplicatedRMTClickHouse22TIT.testReplicatedRMTAutoCreate{String}[1]

Code: 390. DB::Exception: Table `string_types_MEDIUMTEXT_utf8mb4` doesn't exist. (CANNOT_GET_CREATE_TABLE_QUERY) (version 22.3.20.29 (official build)) , server ClickHouseNode [uri=http://localhost:32828/employees, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=client_1}]@1704399411
Raw output
java.sql.BatchUpdateException: 
Code: 390. DB::Exception: Table `string_types_MEDIUMTEXT_utf8mb4` doesn't exist. (CANNOT_GET_CREATE_TABLE_QUERY) (version 22.3.20.29 (official build))
, server ClickHouseNode [uri=http://localhost:32828/employees, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=client_1}]@1704399411
	at com.altinity.clickhouse.debezium.embedded.ReplicatedRMTClickHouse22TIT.testReplicatedRMTAutoCreate(ReplicatedRMTClickHouse22TIT.java:110)
// Validate that all the tables are created.
boolean resultValidated = false;
while(rs.next()) {
resultValidated = true;
String createTableDML = rs.getString(1);
System.out.println(createTableDML);
assert(createTableDML.contains("ReplicatedReplacingMergeTree"));
}

Assert.assertTrue(resultValidated);

boolean dataValidated = false;
// Validate temporal_types_DATETIME data.
ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from string_types_MEDIUMTEXT_utf8mb4");

while(dateTimeResult.next()) {
dataValidated = true;
System.out.println(dateTimeResult.getString("Type").toString());
System.out.println(dateTimeResult.getString("Value").toString());

Assert.assertTrue(dateTimeResult.getString("Type").toString().equalsIgnoreCase("mediumtext"));
Assert.assertTrue(dateTimeResult.getString("Value").toString().equalsIgnoreCase("????"));
}
Assert.assertTrue(dataValidated);

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
@DisplayName("Integration test that validates auto creation of Replicated RMT when the flag is set in config(auto.create.replicated.tables)")
public class ReplicatedRMTDDLClickHouse22TIT {
protected MySQLContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

static GenericContainer zookeeperContainer = new GenericContainer(DockerImageName.parse("zookeeper:3.6.2"))
.withExposedPorts(2181).withAccessToHost(true);

@BeforeEach
public void startContainers() throws InterruptedException {

Network network = Network.newNetwork();
zookeeperContainer.withNetwork(network).withNetworkAliases("zookeeper");
zookeeperContainer.start();

mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("data_types_test.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
// clickHouseContainer.start();
Thread.sleep(15000);

clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:22.3")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withClasspathResourceMapping("config_replicated.xml", "/etc/clickhouse-server/config.d/config.xml", BindMode.READ_ONLY)
.withClasspathResourceMapping("macros.xml", "/etc/clickhouse-server/config.d/macros.xml", BindMode.READ_ONLY)
.withExposedPorts(8123)
.waitingFor(new HttpWaitStrategy().forPort(zookeeperContainer.getFirstMappedPort()));
clickHouseContainer.withNetwork(network);
clickHouseContainer.start();
}


@ParameterizedTest
@CsvSource({
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
props.setProperty(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES.toString(), "true");
props.setProperty(SinkConnectorLightWeightConfig.DISABLE_DDL, "true");


ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});


Thread.sleep(30000);
Connection conn = ITCommon.connectToMySQL(mySqlContainer);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);

ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");
// Validate that all the tables are created.
boolean resultValidated = false;
while(rs.next()) {
resultValidated = true;
String createTableDML = rs.getString(1);
System.out.println(createTableDML);
assert(createTableDML.contains("ReplicatedReplacingMergeTree"));
}

Assert.assertTrue(resultValidated);

boolean dataValidated = false;
// Validate temporal_types_DATETIME data.
ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from string_types_MEDIUMTEXT_utf8mb4");

while(dateTimeResult.next()) {
dataValidated = true;
System.out.println(dateTimeResult.getString("Type").toString());
System.out.println(dateTimeResult.getString("Value").toString());

Assert.assertTrue(dateTimeResult.getString("Type").toString().equalsIgnoreCase("mediumtext"));
Assert.assertTrue(dateTimeResult.getString("Value").toString().equalsIgnoreCase("????"));
}
Assert.assertTrue(dataValidated);

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}

}
Loading

0 comments on commit 9551549

Please sign in to comment.