Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve 2 2 0 conflicts #651

Merged
merged 10 commits into from
Jun 19, 2024
1 change: 1 addition & 0 deletions sink-connector-lightweight/clickhouse/config.xml
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
<session_timeout_ms>15000</session_timeout_ms>
</zookeeper>
<macros>
<cluster>default</cluster>
<replica>clickhouse</replica>
<shard>02</shard>
</macros>
10 changes: 7 additions & 3 deletions sink-connector-lightweight/docker/config_postgres.yml
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
# name: Unique name for the connector. Attempting to register again with the same name will fail.
name: "debezium-embedded-postgres"

auto.create.tables.replicated: "true"

# database.hostname: IP address or hostname of the PostgreSQL database server.
database.hostname: "postgres"

@@ -19,13 +21,15 @@ database.password: "root"
database.server.name: "ER54"

# schema.include.list: An optional list of regular expressions that match schema names to be monitored;
schema.include.list: public
schema.include.list: public,public2

slot.name: connector2

# plugin.name: The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. Supported values are decoderbufs, and pgoutput.
plugin.name: "pgoutput"

# table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
table.include.list: "public.tm,public.tm2"
#table.include.list: "public.tm,public.tm2"

# clickhouse.server.url: Specify only the hostname of the Clickhouse Server.
clickhouse.server.url: "clickhouse"
@@ -118,4 +122,4 @@ database.dbname: "public"
#disable.ddl: "false"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"
#disable.drop.truncate: "false"
8 changes: 8 additions & 0 deletions sink-connector-lightweight/docker/docker-compose-postgres.yml
Original file line number Diff line number Diff line change
@@ -17,6 +17,14 @@ services:
extends:
file: clickhouse-service.yml
service: clickhouse
depends_on:
zookeeper:
condition: service_healthy

zookeeper:
extends:
file: zookeeper-service.yml
service: zookeeper

clickhouse-sink-connector-lt:
extends:
Original file line number Diff line number Diff line change
@@ -137,12 +137,12 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
if(isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if (isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@
@CsvSource({
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates creation of Replicated Replacing Merge Tree")
@DisplayName("Test that validates creation of Replicated Replacing Merge Tree on ClickHouse 22.3 ")
public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();
@@ -107,7 +107,7 @@
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);

ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4");

Check failure on line 110 in sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java

GitHub Actions / JUnit Test Report

ReplicatedRMTClickHouse22TIT.testReplicatedRMTAutoCreate{String}[1]

Code: 390. DB::Exception: Table `string_types_MEDIUMTEXT_utf8mb4` doesn't exist. (CANNOT_GET_CREATE_TABLE_QUERY) (version 22.3.20.29 (official build)) , server ClickHouseNode [uri=http://localhost:32827/employees, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=client_1}]@-1671910357
Raw output
java.sql.BatchUpdateException: 
Code: 390. DB::Exception: Table `string_types_MEDIUMTEXT_utf8mb4` doesn't exist. (CANNOT_GET_CREATE_TABLE_QUERY) (version 22.3.20.29 (official build))
, server ClickHouseNode [uri=http://localhost:32827/employees, options={custom_settings=allow_experimental_object_type=1,insert_allow_materialized_columns=1,client_name=client_1}]@-1671910357
	at com.altinity.clickhouse.debezium.embedded.ReplicatedRMTClickHouse22TIT.testReplicatedRMTAutoCreate(ReplicatedRMTClickHouse22TIT.java:110)
// Validate that all the tables are created.
boolean resultValidated = false;
while(rs.next()) {
Original file line number Diff line number Diff line change
@@ -708,7 +708,7 @@ public void testReplicatedReplacingMergeTreeWithoutIsDeletedColumn() {
String sql = "CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` timestamp(1) NOT NULL) ENGINE=InnoDB;";
mySQLDDLParserService.parseSql(sql, "temporal_types_DATETIME4", clickHouseQuery, isDropOrTruncate);

String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/temporal_types_TIMESTAMP1', '{replica}', _version, is_deleted) ORDER BY tuple()";
String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree(_version, is_deleted) ORDER BY tuple()";
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(expectedResult));


Original file line number Diff line number Diff line change
@@ -50,7 +50,12 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

StringBuilder createTableSyntax = new StringBuilder();

createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("(");
createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`");
if(useReplicatedReplacingMergeTree == true) {
createTableSyntax.append(" ON CLUSTER `{cluster}` ");
}

createTableSyntax.append("(");

for(Field f: fields) {
String colName = f.name();
@@ -96,12 +101,12 @@ public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String t

if(isNewReplacingMergeTreeEngine == true ){
if(useReplicatedReplacingMergeTree == true) {
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn));
} else
createTableSyntax.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if(useReplicatedReplacingMergeTree == true) {
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN));
} else
createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
Loading