diff --git a/sink-connector-lightweight/clickhouse/config.xml b/sink-connector-lightweight/clickhouse/config.xml
index a94e439cd..34d1259ae 100644
--- a/sink-connector-lightweight/clickhouse/config.xml
+++ b/sink-connector-lightweight/clickhouse/config.xml
@@ -8,6 +8,7 @@
15000
+ default
clickhouse
02
diff --git a/sink-connector-lightweight/docker/config_postgres.yml b/sink-connector-lightweight/docker/config_postgres.yml
index 491a544b8..4453291a5 100644
--- a/sink-connector-lightweight/docker/config_postgres.yml
+++ b/sink-connector-lightweight/docker/config_postgres.yml
@@ -3,6 +3,8 @@
# name: Unique name for the connector. Attempting to register again with the same name will fail.
name: "debezium-embedded-postgres"
+auto.create.tables.replicated: "true"
+
# database.hostname: IP address or hostname of the PostgreSQL database server.
database.hostname: "postgres"
@@ -19,13 +21,15 @@ database.password: "root"
database.server.name: "ER54"
# schema.include.list: An optional list of regular expressions that match schema names to be monitored;
-schema.include.list: public
+schema.include.list: public,public2
+
+slot.name: connector2
# plugin.name: The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. Supported values are decoderbufs, and pgoutput.
plugin.name: "pgoutput"
# table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
-table.include.list: "public.tm,public.tm2"
+#table.include.list: "public.tm,public.tm2"
# clickhouse.server.url: Specify only the hostname of the Clickhouse Server.
clickhouse.server.url: "clickhouse"
@@ -118,4 +122,4 @@ database.dbname: "public"
#disable.ddl: "false"
#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
-#disable.drop.truncate: "false"
\ No newline at end of file
+#disable.drop.truncate: "false"
diff --git a/sink-connector-lightweight/docker/docker-compose-postgres.yml b/sink-connector-lightweight/docker/docker-compose-postgres.yml
index 484f9a214..587a3db36 100644
--- a/sink-connector-lightweight/docker/docker-compose-postgres.yml
+++ b/sink-connector-lightweight/docker/docker-compose-postgres.yml
@@ -17,6 +17,14 @@ services:
extends:
file: clickhouse-service.yml
service: clickhouse
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+
+ zookeeper:
+ extends:
+ file: zookeeper-service.yml
+ service: zookeeper
clickhouse-sink-connector-lt:
extends:
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
index f309f34dd..6257e27b0 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
@@ -137,12 +137,12 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
if(isReplicatedReplacingMergeTree == true) {
- this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
+ this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if (isReplicatedReplacingMergeTree == true) {
- this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
+ this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
index 55d5a51ea..09b062af7 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java
@@ -76,7 +76,7 @@ public void startContainers() throws InterruptedException {
@CsvSource({
"clickhouse/clickhouse-server:22.3"
})
- @DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
+ @DisplayName("Test that validates creation of Replicated Replacing Merge Tree on ClickHouse 22.3 ")
public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {
AtomicReference engine = new AtomicReference<>();
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
index fe6b39639..25cf6ce0a 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
@@ -708,7 +708,7 @@ public void testReplicatedReplacingMergeTreeWithoutIsDeletedColumn() {
String sql = "CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` timestamp(1) NOT NULL) ENGINE=InnoDB;";
mySQLDDLParserService.parseSql(sql, "temporal_types_DATETIME4", clickHouseQuery, isDropOrTruncate);
- String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/temporal_types_TIMESTAMP1', '{replica}', _version, is_deleted) ORDER BY tuple()";
+ String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree(_version, is_deleted) ORDER BY tuple()";
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(expectedResult));
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
index b823932b8..0d8b906ad 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
@@ -50,7 +50,12 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t
StringBuilder createTableSyntax = new StringBuilder();
- createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("(");
+ createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`");
+ if(useReplicatedReplacingMergeTree == true) {
+ createTableSyntax.append(" ON CLUSTER `{cluster}` ");
+ }
+
+ createTableSyntax.append("(");
for(Field f: fields) {
String colName = f.name();
@@ -96,12 +101,12 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t
if(isNewReplacingMergeTreeEngine == true ){
if(useReplicatedReplacingMergeTree == true) {
- createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
+ createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn));
} else
createTableSyntax.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if(useReplicatedReplacingMergeTree == true) {
- createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
+ createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN));
} else
createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}