Skip to content

Commit

Permalink
Merge pull request #651 from Altinity/resolve_2_2_0_conflicts
Browse files Browse the repository at this point in the history
Resolve 2 2 0 conflicts
  • Loading branch information
subkanthi authored Jun 19, 2024
2 parents d392189 + 24abbb6 commit b773ab4
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 10 deletions.
1 change: 1 addition & 0 deletions sink-connector-lightweight/clickhouse/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<session_timeout_ms>15000</session_timeout_ms>
</zookeeper>
<macros>
<cluster>default</cluster>
<replica>clickhouse</replica>
<shard>02</shard>
</macros>
Expand Down
10 changes: 7 additions & 3 deletions sink-connector-lightweight/docker/config_postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# name: Unique name for the connector. Attempting to register again with the same name will fail.
name: "debezium-embedded-postgres"

auto.create.tables.replicated: "true"

# database.hostname: IP address or hostname of the PostgreSQL database server.
database.hostname: "postgres"

Expand All @@ -19,13 +21,15 @@ database.password: "root"
database.server.name: "ER54"

# schema.include.list: An optional list of regular expressions that match schema names to be monitored;
schema.include.list: public
schema.include.list: public,public2

slot.name: connector2

# plugin.name: The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. Supported values are decoderbufs, and pgoutput.
plugin.name: "pgoutput"

# table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
table.include.list: "public.tm,public.tm2"
#table.include.list: "public.tm,public.tm2"

# clickhouse.server.url: Specify only the hostname of the Clickhouse Server.
clickhouse.server.url: "clickhouse"
Expand Down Expand Up @@ -118,4 +122,4 @@ database.dbname: "public"
#disable.ddl: "false"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"
#disable.drop.truncate: "false"
8 changes: 8 additions & 0 deletions sink-connector-lightweight/docker/docker-compose-postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ services:
extends:
file: clickhouse-service.yml
service: clickhouse
depends_on:
zookeeper:
condition: service_healthy

zookeeper:
extends:
file: zookeeper-service.yml
service: zookeeper

clickhouse-sink-connector-lt:
extends:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
if(isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if (isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void startContainers() throws InterruptedException {
@CsvSource({
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
@DisplayName("Test that validates creation of Replicated Replacing Merge Tree on ClickHouse 22.3 ")
public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ public void testReplicatedReplacingMergeTreeWithoutIsDeletedColumn() {
String sql = "CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` timestamp(1) NOT NULL) ENGINE=InnoDB;";
mySQLDDLParserService.parseSql(sql, "temporal_types_DATETIME4", clickHouseQuery, isDropOrTruncate);

String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/temporal_types_TIMESTAMP1', '{replica}', _version, is_deleted) ORDER BY tuple()";
String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree(_version, is_deleted) ORDER BY tuple()";
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(expectedResult));


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

StringBuilder createTableSyntax = new StringBuilder();

createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("(");
createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`");
if(useReplicatedReplacingMergeTree == true) {
createTableSyntax.append(" ON CLUSTER `{cluster}` ");
}

createTableSyntax.append("(");

for(Field f: fields) {
String colName = f.name();
Expand Down Expand Up @@ -96,12 +101,12 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

if(isNewReplacingMergeTreeEngine == true ){
if(useReplicatedReplacingMergeTree == true) {
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn));
} else
createTableSyntax.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if(useReplicatedReplacingMergeTree == true) {
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN));
} else
createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
Expand Down

0 comments on commit b773ab4

Please sign in to comment.