From 820656d8e65e0c223a5d3ddee4014dff1f8784bf Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 9 Jun 2024 12:15:48 -0400 Subject: [PATCH 1/6] Removed ZK Path in ReplicatedReplacingMergeTree --- .../embedded/ddl/parser/MySqlDDLParserListenerImpl.java | 4 ++-- .../debezium/embedded/ReplicatedRMTClickHouse22TIT.java | 2 +- .../connector/db/operations/ClickHouseAutoCreateTable.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java index 3efa17330..5a15203c6 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java @@ -128,12 +128,12 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr this.query.append(")"); if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) { if(isReplicatedReplacingMergeTree == true) { - this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn)); + this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn)); } else this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")"); } else { if (isReplicatedReplacingMergeTree == true) { - this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN)); + this.query.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN)); } else this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")"); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java index 55d5a51ea..09b062af7 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java @@ -76,7 +76,7 @@ public void startContainers() throws InterruptedException { @CsvSource({ "clickhouse/clickhouse-server:22.3" }) - @DisplayName("Test that validates creation of Replicated Replacing Merge Tree") + @DisplayName("Test that validates creation of Replicated Replacing Merge Tree on ClickHouse 22.3 ") public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws Exception { AtomicReference engine = new AtomicReference<>(); diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 41d3f63a6..7104be9bb 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -91,12 +91,12 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t if(isNewReplacingMergeTreeEngine == true ){ if(useReplicatedReplacingMergeTree == true) { - createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn)); + createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s, %s)", VERSION_COLUMN, isDeletedColumn)); } else createTableSyntax.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")"); } else { if(useReplicatedReplacingMergeTree == true) { - createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN)); + createTableSyntax.append(String.format("Engine=ReplicatedReplacingMergeTree(%s)", VERSION_COLUMN)); } else createTableSyntax.append("ENGINE = ReplacingMergeTree(").append(VERSION_COLUMN).append(")"); } From 32b3f85727a6e62a52c8ee168c11c55c737f643b Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 9 Jun 2024 12:26:42 -0400 Subject: [PATCH 2/6] Fixed Integration test for RRMT --- .../embedded/ddl/parser/MySqlDDLParserListenerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java index d595be418..4197282f7 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java @@ -708,7 +708,7 @@ public void testReplicatedReplacingMergeTreeWithoutIsDeletedColumn() { String sql = "CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` timestamp(1) NOT NULL) ENGINE=InnoDB;"; mySQLDDLParserService.parseSql(sql, "temporal_types_DATETIME4", clickHouseQuery, isDropOrTruncate); - String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/temporal_types_TIMESTAMP1', '{replica}', _version, is_deleted) ORDER BY tuple()"; + String expectedResult = "CREATE TABLE datatypes.temporal_types_TIMESTAMP1 ON CLUSTER `{cluster}`(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8)Engine=ReplicatedReplacingMergeTree(_version, is_deleted) ORDER BY tuple()"; Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(expectedResult)); From 4570e9480a2694bba846df6c90157421d766ced5 Mon Sep 17 00:00:00 2001 From: selfeer Date: Mon, 10 Jun 2024 13:55:46 +0400 Subject: [PATCH 3/6] fix broken CI/CD --- .../workflows/testflows-sink-connector-kafka.yml | 12 ++++++++++++ .../testflows-sink-connector-lightweight.yml | 12 ++++++++++++ .../tests/integration/requirements.txt | 10 ++++++++-- .../integration/tests/calculated_columns.py | 8 ++++---- .../tests/integration/tests/datatypes.py | 2 +- .../tests/integration/tests/is_deleted.py | 2 +- .../integration/tests/multiple_databases.py | 2 +- .../tests/integration/tests/retry_on_fail.py | 2 +- .../tests/integration/tests/schema_only.py | 2 +- .../tests/integration/tests/sink_cli_commands.py | 2 +- .../tests/integration/tests/steps/alter.py | 16 ++++++++-------- .../tests/integration/tests/steps/mysql.py | 2 +- .../tests/integration/tests/table_names.py | 8 ++++---- .../tests/integration/requirements.txt | 10 ++++++++-- 14 files changed, 63 insertions(+), 27 deletions(-) diff --git a/.github/workflows/testflows-sink-connector-kafka.yml b/.github/workflows/testflows-sink-connector-kafka.yml index 6bbad76eb..7fff61303 100644 --- a/.github/workflows/testflows-sink-connector-kafka.yml +++ b/.github/workflows/testflows-sink-connector-kafka.yml @@ -48,6 +48,18 @@ jobs: working-directory: sink-connector/tests/integration run: echo "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@$(hostname -I | cut -d ' ' -f 1)" + - name: Create a virtual environment + run: | + echo "Install Python modules..." + sudo apt-get clean + sudo apt-get update + sudo apt-get install -y python3.12-venv + + echo "Create and activate Python virtual environment..." + python3 -m venv venv + source venv/bin/activate + echo PATH=$PATH >> $GITHUB_ENV + - name: Install all dependencies working-directory: sink-connector/tests/integration run: pip3 install -r requirements.txt diff --git a/.github/workflows/testflows-sink-connector-lightweight.yml b/.github/workflows/testflows-sink-connector-lightweight.yml index e5bf55461..b954daff6 100644 --- a/.github/workflows/testflows-sink-connector-lightweight.yml +++ b/.github/workflows/testflows-sink-connector-lightweight.yml @@ -64,6 +64,18 @@ jobs: working-directory: sink-connector/tests/integration run: echo "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@$(hostname -I | cut -d ' ' -f 1)" + - name: Create a virtual environment + run: | + echo "Install Python modules..." + sudo apt-get clean + sudo apt-get update + sudo apt-get install -y python3.12-venv + + echo "Create and activate Python virtual environment..." + python3 -m venv venv + source venv/bin/activate + echo PATH=$PATH >> $GITHUB_ENV + - name: Install all dependencies working-directory: sink-connector-lightweight/tests/integration run: pip3 install -r requirements.txt diff --git a/sink-connector-lightweight/tests/integration/requirements.txt b/sink-connector-lightweight/tests/integration/requirements.txt index 869791f58..2e9be8ba7 100644 --- a/sink-connector-lightweight/tests/integration/requirements.txt +++ b/sink-connector-lightweight/tests/integration/requirements.txt @@ -1,4 +1,10 @@ -docker-compose==1.29.2 testflows==2.1.5 +python-dateutil==2.9.0 +numpy==1.26.4 +pyarrow==16.1.0 +pandas==2.2.0 +PyYAML==5.3.1 +docker-compose==1.29.2 awscli==1.27.36 -docker==6.1.3 \ No newline at end of file +docker==6.1.3 +requests==2.31.0 \ No newline at end of file diff --git a/sink-connector-lightweight/tests/integration/tests/calculated_columns.py b/sink-connector-lightweight/tests/integration/tests/calculated_columns.py index c4c93b8f5..a26a9d80e 100644 --- a/sink-connector-lightweight/tests/integration/tests/calculated_columns.py +++ b/sink-connector-lightweight/tests/integration/tests/calculated_columns.py @@ -34,7 +34,7 @@ def string_concatenation(self): f"I create a {table_name} table with calculated column with string concatenation" ): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"first_name VARCHAR(50) NOT NULL,last_name VARCHAR(50) NOT NULL,fullname varchar(101) " f"GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),email VARCHAR(100) NOT NULL", clickhouse_table_engine=self.context.clickhouse_table_engines[0], @@ -61,7 +61,7 @@ def basic_arithmetic_operations(self): with Given(f"I create a {table_name} table with calculated column"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"a INT, b INT, sum_col INT AS (a + b), diff_col INT AS (a - b), prod_col INT AS (a * b), div_col DOUBLE AS (a / b)", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) @@ -88,7 +88,7 @@ def complex_expressions(self): with Given(f"I create a {table_name} table with calculated column"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"base_salary DECIMAL(10,2), bonus_rate DECIMAL(5,2), total_compensation DECIMAL(12,2) AS (base_salary + (base_salary * bonus_rate / 100))", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) @@ -117,7 +117,7 @@ def nested(self): with Given(f"I create a {table_name} table with calculated column"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"a INT, b INT, c INT AS (a + b), d INT AS (c * 2)", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) diff --git a/sink-connector-lightweight/tests/integration/tests/datatypes.py b/sink-connector-lightweight/tests/integration/tests/datatypes.py index 60bf39a5a..17a763d43 100644 --- a/sink-connector-lightweight/tests/integration/tests/datatypes.py +++ b/sink-connector-lightweight/tests/integration/tests/datatypes.py @@ -36,7 +36,7 @@ def create_table_with_datetime_column(self, table_name, data, precision): with By(f"creating a {table_name} table with datetime column"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"date DATETIME({precision})", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) diff --git a/sink-connector-lightweight/tests/integration/tests/is_deleted.py b/sink-connector-lightweight/tests/integration/tests/is_deleted.py index 7a072231a..ad85c9b02 100644 --- a/sink-connector-lightweight/tests/integration/tests/is_deleted.py +++ b/sink-connector-lightweight/tests/integration/tests/is_deleted.py @@ -19,7 +19,7 @@ def create_table_with_is_deleted( f"creating a {table_name} table with is_deleted column and {datatype} datatype" ): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"col1 varchar(255), col2 int, {column} {datatype}", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) diff --git a/sink-connector-lightweight/tests/integration/tests/multiple_databases.py b/sink-connector-lightweight/tests/integration/tests/multiple_databases.py index ddcd94431..c2fc89c08 100644 --- a/sink-connector-lightweight/tests/integration/tests/multiple_databases.py +++ b/sink-connector-lightweight/tests/integration/tests/multiple_databases.py @@ -99,7 +99,7 @@ def create_table_and_insert_values( with By("creating a sample table in MySQL"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"col1 varchar(255), col2 int", database_name=database_name, clickhouse_table_engine=self.context.clickhouse_table_engines[0], diff --git a/sink-connector-lightweight/tests/integration/tests/retry_on_fail.py b/sink-connector-lightweight/tests/integration/tests/retry_on_fail.py index 3792a9961..8f3f1ca79 100644 --- a/sink-connector-lightweight/tests/integration/tests/retry_on_fail.py +++ b/sink-connector-lightweight/tests/integration/tests/retry_on_fail.py @@ -16,7 +16,7 @@ def retry_on_fail(self): with When("I creat a table in MySQL"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"retry VARCHAR(16)", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) diff --git a/sink-connector-lightweight/tests/integration/tests/schema_only.py b/sink-connector-lightweight/tests/integration/tests/schema_only.py index 074eb4a5c..5714ba822 100644 --- a/sink-connector-lightweight/tests/integration/tests/schema_only.py +++ b/sink-connector-lightweight/tests/integration/tests/schema_only.py @@ -15,7 +15,7 @@ def create_table_structure(self, table_name): with By(f"creating a {table_name} table"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"col1 varchar(255), col2 int", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) diff --git a/sink-connector-lightweight/tests/integration/tests/sink_cli_commands.py b/sink-connector-lightweight/tests/integration/tests/sink_cli_commands.py index 4f425b480..818336d0c 100644 --- a/sink-connector-lightweight/tests/integration/tests/sink_cli_commands.py +++ b/sink-connector-lightweight/tests/integration/tests/sink_cli_commands.py @@ -47,7 +47,7 @@ def create_and_validate_table(self, table_name): "creating a table in MySQL and checking that it was also created in ClickHouse" ): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns=f"col1 varchar(255), col2 int", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) diff --git a/sink-connector-lightweight/tests/integration/tests/steps/alter.py b/sink-connector-lightweight/tests/integration/tests/steps/alter.py index 33ba1ca56..a6135c008 100644 --- a/sink-connector-lightweight/tests/integration/tests/steps/alter.py +++ b/sink-connector-lightweight/tests/integration/tests/steps/alter.py @@ -20,7 +20,7 @@ def add_column( node = self.context.cluster.node("mysql-master") node.query( - f"ALTER TABLE {database}.\`{table_name}\` ADD COLUMN {column_name} {column_type};" + rf"ALTER TABLE {database}.\`{table_name}\` ADD COLUMN {column_name} {column_type};" ) @@ -41,7 +41,7 @@ def rename_column( node = self.context.cluster.node("mysql-master") node.query( - f"ALTER TABLE {database}.\`{table_name}\` RENAME COLUMN {column_name} to {new_column_name};" + rf"ALTER TABLE {database}.\`{table_name}\` RENAME COLUMN {column_name} to {new_column_name};" ) @@ -63,7 +63,7 @@ def change_column( node = self.context.cluster.node("mysql-master") node.query( - f"ALTER TABLE {database}.\`{table_name}\` CHANGE COLUMN {column_name} {new_column_name} {new_column_type};" + rf"ALTER TABLE {database}.\`{table_name}\` CHANGE COLUMN {column_name} {new_column_name} {new_column_type};" ) @@ -84,7 +84,7 @@ def modify_column( node = self.context.cluster.node("mysql-master") node.query( - f"ALTER TABLE {database}.\`{table_name}\` MODIFY COLUMN {column_name} {new_column_type};" + rf"ALTER TABLE {database}.\`{table_name}\` MODIFY COLUMN {column_name} {new_column_type};" ) @@ -97,7 +97,7 @@ def drop_column(self, table_name, column_name="new_col", node=None, database=Non if node is None: node = self.context.cluster.node("mysql-master") - node.query(f"ALTER TABLE {database}.\`{table_name}\` DROP COLUMN {column_name};") + node.query(rf"ALTER TABLE {database}.\`{table_name}\` DROP COLUMN {column_name};") @TestStep(When) @@ -149,7 +149,7 @@ def add_column_null_not_null( null_not_null = "NOT NULL" if not is_null else "NULL" node.query( - f"ALTER TABLE {database}.\`{table_name}\` ADD COLUMN {column_name} {column_type} {null_not_null};" + rf"ALTER TABLE {database}.\`{table_name}\` ADD COLUMN {column_name} {column_type} {null_not_null};" ) @@ -171,7 +171,7 @@ def add_column_default( node = self.context.cluster.node("mysql-master") node.query( - f"ALTER TABLE {database}.\`{table_name}\` ADD COLUMN {column_name} {column_type} DEFAULT {default_value};" + rf"ALTER TABLE {database}.\`{table_name}\` ADD COLUMN {column_name} {column_type} DEFAULT {default_value};" ) @@ -185,5 +185,5 @@ def add_primary_key(self, table_name, column_name, node=None, database=None): node = self.context.cluster.node("mysql-master") node.query( - f"ALTER TABLE {database}.\`{table_name}\` ADD PRIMARY KEY ({column_name});" + rf"ALTER TABLE {database}.\`{table_name}\` ADD PRIMARY KEY ({column_name});" ) diff --git a/sink-connector-lightweight/tests/integration/tests/steps/mysql.py b/sink-connector-lightweight/tests/integration/tests/steps/mysql.py index 6892ffc76..e98026bc3 100644 --- a/sink-connector-lightweight/tests/integration/tests/steps/mysql.py +++ b/sink-connector-lightweight/tests/integration/tests/steps/mysql.py @@ -350,7 +350,7 @@ def insert(self, table_name, values, node=None, database_name=None): node = self.context.cluster.node("mysql-master") with When("I insert data into MySQL table"): - node.query(f"INSERT INTO {database_name}.\`{table_name}\` VALUES ({values});") + node.query(rf"INSERT INTO {database_name}.\`{table_name}\` VALUES ({values});") @TestStep(Given) diff --git a/sink-connector-lightweight/tests/integration/tests/table_names.py b/sink-connector-lightweight/tests/integration/tests/table_names.py index e958d54e5..331e2d621 100644 --- a/sink-connector-lightweight/tests/integration/tests/table_names.py +++ b/sink-connector-lightweight/tests/integration/tests/table_names.py @@ -52,25 +52,25 @@ def check_table_names(self, table_name): with Given(f"I create the {table_name} table"): create_mysql_to_clickhouse_replicated_table( - name=f"\`{table_name}\`", + name=rf"\`{table_name}\`", mysql_columns="x INT", clickhouse_columns="x Int32", clickhouse_table_engine=self.context.clickhouse_table_engines[0], ) with And("I insert data into the table"): - mysql_node.query(f"INSERT INTO \`{table_name}\` VALUES (1, 1);") + mysql_node.query(rf"INSERT INTO \`{table_name}\` VALUES (1, 1);") with Then(f"I check that the {table_name} was created in the ClickHouse side"): for retry in retries(timeout=40, delay=1): with retry: - clickhouse_node.query(f"EXISTS test.\`{table_name}\`", message="1") + clickhouse_node.query(rf"EXISTS test.\`{table_name}\`", message="1") with And("I check that the data was inserted correctly into the ClickHouse table"): for retry in retries(timeout=40, delay=1): with retry: clickhouse_data = clickhouse_node.query( - f"SELECT id,x FROM test.\`{table_name}\` FORMAT CSV" + rf"SELECT id,x FROM test.\`{table_name}\` FORMAT CSV" ) assert clickhouse_data.output.strip() == "1,1", error() diff --git a/sink-connector/tests/integration/requirements.txt b/sink-connector/tests/integration/requirements.txt index c190f0357..2e9be8ba7 100644 --- a/sink-connector/tests/integration/requirements.txt +++ b/sink-connector/tests/integration/requirements.txt @@ -1,4 +1,10 @@ -docker-compose==1.29.2 testflows==2.1.5 -docker==6.1.3 +python-dateutil==2.9.0 +numpy==1.26.4 +pyarrow==16.1.0 +pandas==2.2.0 +PyYAML==5.3.1 +docker-compose==1.29.2 awscli==1.27.36 +docker==6.1.3 +requests==2.31.0 \ No newline at end of file From d456bb0564fbb6337b30b9bd490b2c6ec266a75d Mon Sep 17 00:00:00 2001 From: selfeer Date: Mon, 10 Jun 2024 13:57:59 +0400 Subject: [PATCH 4/6] fix mysql version --- .../tests/integration/env/auto/docker-compose.yml | 2 +- sink-connector/tests/integration/env/docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sink-connector-lightweight/tests/integration/env/auto/docker-compose.yml b/sink-connector-lightweight/tests/integration/env/auto/docker-compose.yml index 080c314ec..d5af369e7 100644 --- a/sink-connector-lightweight/tests/integration/env/auto/docker-compose.yml +++ b/sink-connector-lightweight/tests/integration/env/auto/docker-compose.yml @@ -3,7 +3,7 @@ version: "2.3" services: mysql-master: - image: docker.io/bitnami/mysql:8.0 + image: docker.io/bitnami/mysql:8.0.36 restart: "no" expose: - "3306" diff --git a/sink-connector/tests/integration/env/docker-compose.yml b/sink-connector/tests/integration/env/docker-compose.yml index 969603d29..7c6733b67 100644 --- a/sink-connector/tests/integration/env/docker-compose.yml +++ b/sink-connector/tests/integration/env/docker-compose.yml @@ -4,7 +4,7 @@ version: "2.3" services: mysql-master: container_name: mysql-master - image: docker.io/bitnami/mysql:8.0 + image: docker.io/bitnami/mysql:8.0.36 restart: "no" expose: - "3306" From 9a3ac10b74587c717f1c9ed97d3ed5493da07513 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 10 Jun 2024 15:48:14 -0400 Subject: [PATCH 5/6] Add ON CLUSTER to AutoCreateTable flow when a new record is inserted --- .../connector/db/operations/ClickHouseAutoCreateTable.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java index 7104be9bb..1d248d2b8 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java @@ -49,7 +49,12 @@ public java.lang.String createTableSyntax(ArrayList primaryKey, String t StringBuilder createTableSyntax = new StringBuilder(); - createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`").append("("); + createTableSyntax.append(CREATE_TABLE).append(" ").append(databaseName).append(".").append("`").append(tableName).append("`"); + if(useReplicatedReplacingMergeTree == true) { + createTableSyntax.append(" ON CLUSTER `{cluster}` "); + } + + createTableSyntax.append("("); for(Field f: fields) { String colName = f.name(); From 4e008215f6b79b6421f5a2b074f0c1996135fbae Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 10 Jun 2024 21:46:40 -0400 Subject: [PATCH 6/6] Added config to support RRMT --- sink-connector-lightweight/clickhouse/config.xml | 1 + sink-connector-lightweight/docker/config_postgres.yml | 10 +++++++--- .../docker/docker-compose-postgres.yml | 8 ++++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sink-connector-lightweight/clickhouse/config.xml b/sink-connector-lightweight/clickhouse/config.xml index a94e439cd..34d1259ae 100644 --- a/sink-connector-lightweight/clickhouse/config.xml +++ b/sink-connector-lightweight/clickhouse/config.xml @@ -8,6 +8,7 @@ 15000 + default clickhouse 02 diff --git a/sink-connector-lightweight/docker/config_postgres.yml b/sink-connector-lightweight/docker/config_postgres.yml index 491a544b8..4453291a5 100644 --- a/sink-connector-lightweight/docker/config_postgres.yml +++ b/sink-connector-lightweight/docker/config_postgres.yml @@ -3,6 +3,8 @@ # name: Unique name for the connector. Attempting to register again with the same name will fail. name: "debezium-embedded-postgres" +auto.create.tables.replicated: "true" + # database.hostname: IP address or hostname of the PostgreSQL database server. database.hostname: "postgres" @@ -19,13 +21,15 @@ database.password: "root" database.server.name: "ER54" # schema.include.list: An optional list of regular expressions that match schema names to be monitored; -schema.include.list: public +schema.include.list: public,public2 + +slot.name: connector2 # plugin.name: The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. Supported values are decoderbufs, and pgoutput. plugin.name: "pgoutput" # table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored; -table.include.list: "public.tm,public.tm2" +#table.include.list: "public.tm,public.tm2" # clickhouse.server.url: Specify only the hostname of the Clickhouse Server. clickhouse.server.url: "clickhouse" @@ -118,4 +122,4 @@ database.dbname: "public" #disable.ddl: "false" #disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false. -#disable.drop.truncate: "false" \ No newline at end of file +#disable.drop.truncate: "false" diff --git a/sink-connector-lightweight/docker/docker-compose-postgres.yml b/sink-connector-lightweight/docker/docker-compose-postgres.yml index 484f9a214..587a3db36 100644 --- a/sink-connector-lightweight/docker/docker-compose-postgres.yml +++ b/sink-connector-lightweight/docker/docker-compose-postgres.yml @@ -17,6 +17,14 @@ services: extends: file: clickhouse-service.yml service: clickhouse + depends_on: + zookeeper: + condition: service_healthy + + zookeeper: + extends: + file: zookeeper-service.yml + service: zookeeper clickhouse-sink-connector-lt: extends: