diff --git a/sink-connector-lightweight/dependency-reduced-pom.xml b/sink-connector-lightweight/dependency-reduced-pom.xml
index 8ca1e0676..4c19d768e 100644
--- a/sink-connector-lightweight/dependency-reduced-pom.xml
+++ b/sink-connector-lightweight/dependency-reduced-pom.xml
@@ -65,6 +65,12 @@
all
10
+
+
+ listener
+ com.altinity.clickhouse.debezium.embedded.FailFastListener
+
+
true
true
true
@@ -97,7 +103,7 @@
io.debezium
debezium-connector-mongodb
- 2.5.0.Alpha1
+ 2.5.0.Beta1
test
@@ -299,7 +305,7 @@
UTF-8
3.1.1
17
- 2.5.0.Alpha1
+ 2.5.0.Beta1
io.quarkus.platform
diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml
index a8453ea67..6e8939189 100644
--- a/sink-connector-lightweight/pom.xml
+++ b/sink-connector-lightweight/pom.xml
@@ -425,18 +425,6 @@
${sink-connector-library-version}
compile
-
- com.altinity
- clickhouse-kafka-sink-connector
- 0.0.6
- test
-
-
- com.altinity
- clickhouse-kafka-sink-connector
- 0.0.6
- compile
-
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
index 8e77af41a..a5f788a55 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
@@ -15,8 +15,11 @@
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
+import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
/**
* This class contains the only overridden functions from the generated parser.
@@ -92,11 +95,15 @@ public void enterCopyCreateTable(MySqlParser.CopyCreateTableContext copyCreateTa
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCreateTableContext) {
StringBuilder orderByColumns = new StringBuilder();
StringBuilder partitionByColumn = new StringBuilder();
- parseCreateTable(columnCreateTableContext, orderByColumns, partitionByColumn);
+ Set columnNames = parseCreateTable(columnCreateTableContext, orderByColumns, partitionByColumn);
//this.query.append(" Engine=")
+ String isDeletedColumn = IS_DELETED_COLUMN;
+ if(columnNames.contains(isDeletedColumn)) {
+ isDeletedColumn = "__" + IS_DELETED_COLUMN;
+ }
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
- this.query.append("`").append(IS_DELETED_COLUMN).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
+ this.query.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
} else {
this.query.append("`").append(SIGN_COLUMN).append("` ").append(SIGN_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE);
@@ -104,7 +111,7 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
- this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(IS_DELETED_COLUMN).append(")");
+ this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
@@ -119,10 +126,10 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
}
- private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
+ private Set parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
StringBuilder partitionByColumns) {
List pt = ctx.children;
-
+ Set columnNames = new HashSet<>();
this.query.append(Constants.CREATE_TABLE).append(" ");
for (ParseTree tree : pt) {
@@ -136,66 +143,7 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
if (subtree instanceof TerminalNodeImpl) {
// this.query.append(subtree.getText());
} else if (subtree instanceof MySqlParser.ColumnDeclarationContext) {
- String columnName = null;
- String colDataType = null;
- boolean isNullColumn = true;
- boolean isGeneratedColumn = false;
- String generatedColumn = "";
-
- for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
- if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
- columnName = colDefTree.getText();
- this.query.append(columnName).append(" ");
- } else if (colDefTree instanceof MySqlParser.ColumnDefinitionContext) {
- String colDataTypeDefinition = colDefTree.getText();
-
- colDataType = getClickHouseDataType(colDataTypeDefinition, colDefTree, columnName);
- // Null Column and DimensionDataType are children of ColumnDefinition
- for(ParseTree colDefinitionChildTree: ((MySqlParser.ColumnDefinitionContext) colDefTree).children) {
- if (colDefinitionChildTree instanceof MySqlParser.NullColumnConstraintContext) {
- if (colDefinitionChildTree.getText().equalsIgnoreCase(Constants.NOT_NULL))
- isNullColumn = false;
- } else if(colDefinitionChildTree instanceof MySqlParser.DimensionDataTypeContext) {
- if (colDefinitionChildTree.getText() != null) {
-
- }
- } else if (colDefinitionChildTree instanceof MySqlParser.PrimaryKeyColumnConstraintContext) {
- for(ParseTree primaryKeyTree: ((MySqlParser.PrimaryKeyColumnConstraintContext) colDefinitionChildTree).children) {
- System.out.println(primaryKeyTree.getText());
- orderByColumns.append(columnName);
- break;
- }
- } else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
- for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
- if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
- isGeneratedColumn = true;
- generatedColumn = generatedColumnTree.getText();
- //this.query.append(Constants.AS).append(" ").append(expression);
- }
- }
-
- }
- }
- if(isGeneratedColumn) {
- if(isNullColumn){
- this.query.append(Constants.NULLABLE).append("(").append(colDataType)
- .append(")");
- } else
- this.query.append(colDataType);
-
- this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
- continue;
- }
-
- if(isNullColumn) {
- this.query.append(Constants.NULLABLE).append("(").append(colDataType)
- .append(")").append(",");
- }
- else {
- this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
- }
- }
- }
+ parseColumnDefinitions(subtree, orderByColumns, columnNames);
} else if(subtree instanceof MySqlParser.ConstraintDeclarationContext) {
for(ParseTree constraintTree: ((MySqlParser.ConstraintDeclarationContext) subtree).children) {
if(constraintTree instanceof MySqlParser.PrimaryKeyTableConstraintContext) {
@@ -234,6 +182,79 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
}
}
}
+
+ return columnNames;
+ }
+
+ /**
+ * Function to parse column definitions.
+ * @param subtree
+ * @param orderByColumns
+ *
+ * @return list of column names
+ */
+ private void parseColumnDefinitions(ParseTree subtree, StringBuilder orderByColumns, Set columnNames) {
+ String columnName = null;
+ String colDataType = null;
+ boolean isNullColumn = true;
+ boolean isGeneratedColumn = false;
+ String generatedColumn = "";
+
+ for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
+ if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
+ columnName = colDefTree.getText();
+ this.query.append(columnName).append(" ");
+ } else if (colDefTree instanceof MySqlParser.ColumnDefinitionContext) {
+ String colDataTypeDefinition = colDefTree.getText();
+
+ colDataType = getClickHouseDataType(colDataTypeDefinition, colDefTree, columnName);
+ // Null Column and DimensionDataType are children of ColumnDefinition
+ for(ParseTree colDefinitionChildTree: ((MySqlParser.ColumnDefinitionContext) colDefTree).children) {
+ if (colDefinitionChildTree instanceof MySqlParser.NullColumnConstraintContext) {
+ if (colDefinitionChildTree.getText().equalsIgnoreCase(Constants.NOT_NULL))
+ isNullColumn = false;
+ } else if(colDefinitionChildTree instanceof MySqlParser.DimensionDataTypeContext) {
+ if (colDefinitionChildTree.getText() != null) {
+
+ }
+ } else if (colDefinitionChildTree instanceof MySqlParser.PrimaryKeyColumnConstraintContext) {
+ for(ParseTree primaryKeyTree: ((MySqlParser.PrimaryKeyColumnConstraintContext) colDefinitionChildTree).children) {
+ System.out.println(primaryKeyTree.getText());
+ orderByColumns.append(columnName);
+ break;
+ }
+ } else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
+ for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
+ if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
+ isGeneratedColumn = true;
+ generatedColumn = generatedColumnTree.getText();
+ //this.query.append(Constants.AS).append(" ").append(expression);
+ }
+ }
+
+ }
+ }
+ if(isGeneratedColumn) {
+ if(isNullColumn){
+ this.query.append(Constants.NULLABLE).append("(").append(colDataType)
+ .append(")");
+ } else
+ this.query.append(colDataType);
+
+ this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
+ continue;
+ }
+
+ if(isNullColumn) {
+ this.query.append(Constants.NULLABLE).append("(").append(colDataType)
+ .append(")").append(",");
+ }
+ else {
+ this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
+ }
+ columnNames.add(columnName);
+ }
+ }
}
/**
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java
new file mode 100644
index 000000000..9adde346c
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java
@@ -0,0 +1,112 @@
+package com.altinity.clickhouse.debezium.embedded.ddl.parser;
+
+import com.altinity.clickhouse.debezium.embedded.ITCommon;
+import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
+import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Testcontainers
+@DisplayName("Integration Test that validates DDL Creation when there are source columns with the same name(is_deleted)")
+public class IsDeletedColumnsIT {
+
+ protected MySQLContainer mySqlContainer;
+ static ClickHouseContainer clickHouseContainer;
+
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("data_types.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ // clickHouseContainer.start();
+ Thread.sleep(15000);
+ }
+
+ static {
+ clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_it.sql")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withExposedPorts(8123);
+
+ clickHouseContainer.start();
+ }
+
+
+ @ParameterizedTest
+ @CsvSource({
+ "clickhouse/clickhouse-server:latest",
+ "clickhouse/clickhouse-server:22.3"
+ })
+ @DisplayName("Test that validates create table in CH when MySQL has is_deleted columns")
+ public void testIsDeleted(String clickHouseServerVersion) throws Exception {
+
+ AtomicReference engine = new AtomicReference<>();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+
+ engine.set(new DebeziumChangeEventCapture());
+ engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
+ new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())),false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ Thread.sleep(10000);
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+ conn.prepareStatement("create table new_table(col1 varchar(255), col2 int, is_deleted int, _sign int)").execute();
+
+ Thread.sleep(10000);
+
+ conn.prepareStatement("insert into new_table values('test', 1, 22, 1)").execute();
+ conn.close();
+ Thread.sleep(10000);
+
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
+ ResultSet rs = writer.executeQueryWithResultSet("select * from new_table");
+ boolean recordFound = false;
+ while(rs.next()) {
+ recordFound = true;
+ Assert.assertTrue(rs.getString("col1").equalsIgnoreCase("test"));
+ Assert.assertTrue(rs.getInt("col2") == 1);
+ Assert.assertTrue(rs.getInt("is_deleted") == 22);
+ Assert.assertTrue(rs.getInt("_sign") == 1);
+ }
+ Assert.assertTrue(recordFound);
+
+ if(engine.get() != null) {
+ engine.get().stop();
+ }
+ executorService.shutdown();
+ }
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
index afcc01e69..5b89ecd0e 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
@@ -591,6 +591,16 @@ public void testGeneratedColumn() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE employees.contacts(fullname Nullable(String) ALIAS CONCAT(first_name,' ',last_name),email String NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()"));
}
+ @Test
+ public void testSourceWithIsDeletedColumn() {
+ StringBuffer clickHouseQuery = new StringBuffer();
+
+ String sql = "create table new_table(col1 varchar(255), col2 int, is_deleted int, _sign int);";
+ mySQLDDLParserService.parseSql(sql, "", clickHouseQuery);
+
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE new_table(col1 Nullable(String),col2 Nullable(Int32),is_deleted Nullable(Int32),_sign Nullable(Int32),`_version` UInt64,`__is_deleted` UInt8) Engine=ReplacingMergeTree(_version,__is_deleted) ORDER BY tuple()"));
+ }
+
@ParameterizedTest
@CsvSource({
"ALTER TABLE test_table rename to test_table_new, false",
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
index 140e6ec96..b072a787f 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
@@ -21,6 +21,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
@@ -34,12 +35,7 @@
@DisplayName("Integration Test that validates DDL(Create, ALTER, RENAME) on Clickhouse 22.3 and latest docker tags")
public class TableOperationsIT {
protected MySQLContainer mySqlContainer;
- protected ClickHouseContainer clickHouseContainer;
-
-// @Container
-// public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
-// .withInitScript("init_clickhouse.sql")
-// .withExposedPorts(8123);
+ static ClickHouseContainer clickHouseContainer;
@BeforeEach
public void startContainers() throws InterruptedException {
@@ -56,14 +52,7 @@ public void startContainers() throws InterruptedException {
Thread.sleep(15000);
}
- @ParameterizedTest
- @CsvSource({
- "clickhouse/clickhouse-server:latest",
- "clickhouse/clickhouse-server:22.3"
- })
- public void testTableOperations(String clickHouseServerVersion) throws Exception {
- //,String rcxExpectedResult) throws Exception {
-
+ static {
clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
@@ -72,6 +61,14 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
.withExposedPorts(8123);
clickHouseContainer.start();
+ }
+ @ParameterizedTest
+ @CsvSource({
+ "clickhouse/clickhouse-server:latest",
+ "clickhouse/clickhouse-server:22.3"
+ })
+ @DisplayName("Test that validates DDL(Create, ALTER, RENAME)")
+ public void testTableOperations(String clickHouseServerVersion) throws Exception {
AtomicReference engine = new AtomicReference<>();
@@ -94,11 +91,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
conn.prepareStatement("RENAME TABLE ship_class to ship_class_new, add_test to add_test_new").execute();
conn.prepareStatement("RENAME TABLE ship_class_new to ship_class_new2").execute();
conn.prepareStatement("ALTER TABLE ship_class_new2 rename ship_class_new3").execute();
-
- //conn.prepareStatement("ALTER TABLE ship_class_new2 rename ship_class_new3").execute();
-
conn.prepareStatement("create table new_table(col1 varchar(255), col2 int, col3 int)").execute();
-
conn.prepareStatement("CREATE TABLE members (\n" +
" firstname VARCHAR(25) NOT NULL,\n" +
" lastname VARCHAR(25) NOT NULL,\n" +
@@ -108,9 +101,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
")\n" +
"PARTITION BY KEY(joined)\n" +
"PARTITIONS 6;").execute();
-
conn.prepareStatement("create table copied_table like new_table").execute();
-
conn.prepareStatement("CREATE TABLE rcx ( a INT not null, b INT, c CHAR(3) not null, d INT not null) PARTITION BY RANGE COLUMNS(a,d,c) ( PARTITION p0 VALUES LESS THAN (5,10,'ggg'));").execute();
Thread.sleep(10000);
@@ -188,23 +179,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
"ORDER BY tuple()\n" +
"SETTINGS index_granularity = 8192"));
}
-// else {
-// Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" +
-// "(\n" +
-// " `a` Int32,\n" +
-// " `b` Nullable(Int32),\n" +
-// " `c` String,\n" +
-// " `d` Int32,\n" +
-// " `_sign` Int8,\n" +
-// " `_version` UInt64\n" +
-// ")\n" +
-// "ENGINE = ReplacingMergeTree(_version, is_deleted)\n" +
-// "PARTITION BY (a, d, c)\n" +
-// "ORDER BY tuple()\n" +
-// "SETTINGS index_granularity = 8192"));
-// }
-
-// new com.altinity.clickhouse.sink.connector.db.DBMetadata().getTableEngine(writer.getConnection(), "employees", "rmt_test");
+
if(engine.get() != null) {
engine.get().stop();
@@ -214,5 +189,4 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
}
-
}