Skip to content

Commit

Permalink
Merge pull request #434 from Altinity/431-support-source-tables-with-…
Browse files Browse the repository at this point in the history
…is_deleted-columns

Rename is_deleted column if source tables have column with the same name
  • Loading branch information
subkanthi authored Jan 12, 2024
2 parents 90325d8 + d71c32f commit f1eb58a
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 117 deletions.
10 changes: 8 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
</includes>
<parallel>all</parallel>
<threadCount>10</threadCount>
<properties>
<property>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
Expand Down Expand Up @@ -97,7 +103,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.5.0.Alpha1</version>
<version>2.5.0.Beta1</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -299,7 +305,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.5.0.Alpha1</version.debezium>
<version.debezium>2.5.0.Beta1</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
12 changes: 0 additions & 12 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,6 @@
<version>${sink-connector-library-version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.6</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<extensions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* This class contains the only overridden functions from the generated parser.
Expand Down Expand Up @@ -92,19 +95,23 @@ public void enterCopyCreateTable(MySqlParser.CopyCreateTableContext copyCreateTa
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCreateTableContext) {
StringBuilder orderByColumns = new StringBuilder();
StringBuilder partitionByColumn = new StringBuilder();
parseCreateTable(columnCreateTableContext, orderByColumns, partitionByColumn);
Set<String> columnNames = parseCreateTable(columnCreateTableContext, orderByColumns, partitionByColumn);
//this.query.append(" Engine=")
String isDeletedColumn = IS_DELETED_COLUMN;
if(columnNames.contains(isDeletedColumn)) {
isDeletedColumn = "__" + IS_DELETED_COLUMN;
}
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(IS_DELETED_COLUMN).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
this.query.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
} else {
this.query.append("`").append(SIGN_COLUMN).append("` ").append(SIGN_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE);
}

this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(IS_DELETED_COLUMN).append(")");
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");

Expand All @@ -119,10 +126,10 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr

}

private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
private Set<String> parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder orderByColumns,
StringBuilder partitionByColumns) {
List<ParseTree> pt = ctx.children;

Set<String> columnNames = new HashSet<>();

this.query.append(Constants.CREATE_TABLE).append(" ");
for (ParseTree tree : pt) {
Expand All @@ -136,66 +143,7 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
if (subtree instanceof TerminalNodeImpl) {
// this.query.append(subtree.getText());
} else if (subtree instanceof MySqlParser.ColumnDeclarationContext) {
String columnName = null;
String colDataType = null;
boolean isNullColumn = true;
boolean isGeneratedColumn = false;
String generatedColumn = "";

for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
columnName = colDefTree.getText();
this.query.append(columnName).append(" ");
} else if (colDefTree instanceof MySqlParser.ColumnDefinitionContext) {
String colDataTypeDefinition = colDefTree.getText();

colDataType = getClickHouseDataType(colDataTypeDefinition, colDefTree, columnName);
// Null Column and DimensionDataType are children of ColumnDefinition
for(ParseTree colDefinitionChildTree: ((MySqlParser.ColumnDefinitionContext) colDefTree).children) {
if (colDefinitionChildTree instanceof MySqlParser.NullColumnConstraintContext) {
if (colDefinitionChildTree.getText().equalsIgnoreCase(Constants.NOT_NULL))
isNullColumn = false;
} else if(colDefinitionChildTree instanceof MySqlParser.DimensionDataTypeContext) {
if (colDefinitionChildTree.getText() != null) {

}
} else if (colDefinitionChildTree instanceof MySqlParser.PrimaryKeyColumnConstraintContext) {
for(ParseTree primaryKeyTree: ((MySqlParser.PrimaryKeyColumnConstraintContext) colDefinitionChildTree).children) {
System.out.println(primaryKeyTree.getText());
orderByColumns.append(columnName);
break;
}
} else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
isGeneratedColumn = true;
generatedColumn = generatedColumnTree.getText();
//this.query.append(Constants.AS).append(" ").append(expression);
}
}

}
}
if(isGeneratedColumn) {
if(isNullColumn){
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")");
} else
this.query.append(colDataType);

this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
continue;
}

if(isNullColumn) {
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")").append(",");
}
else {
this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
}
}
}
parseColumnDefinitions(subtree, orderByColumns, columnNames);
} else if(subtree instanceof MySqlParser.ConstraintDeclarationContext) {
for(ParseTree constraintTree: ((MySqlParser.ConstraintDeclarationContext) subtree).children) {
if(constraintTree instanceof MySqlParser.PrimaryKeyTableConstraintContext) {
Expand Down Expand Up @@ -234,6 +182,79 @@ private void parseCreateTable(MySqlParser.CreateTableContext ctx, StringBuilder
}
}
}

return columnNames;
}

/**
* Function to parse column definitions.
* @param subtree
* @param orderByColumns
*
* @return list of column names
*/
private void parseColumnDefinitions(ParseTree subtree, StringBuilder orderByColumns, Set<String> columnNames) {
String columnName = null;
String colDataType = null;
boolean isNullColumn = true;
boolean isGeneratedColumn = false;
String generatedColumn = "";

for (ParseTree colDefTree : ((MySqlParser.ColumnDeclarationContext) subtree).children) {
if (colDefTree instanceof MySqlParser.FullColumnNameContext) {
columnName = colDefTree.getText();
this.query.append(columnName).append(" ");
} else if (colDefTree instanceof MySqlParser.ColumnDefinitionContext) {
String colDataTypeDefinition = colDefTree.getText();

colDataType = getClickHouseDataType(colDataTypeDefinition, colDefTree, columnName);
// Null Column and DimensionDataType are children of ColumnDefinition
for(ParseTree colDefinitionChildTree: ((MySqlParser.ColumnDefinitionContext) colDefTree).children) {
if (colDefinitionChildTree instanceof MySqlParser.NullColumnConstraintContext) {
if (colDefinitionChildTree.getText().equalsIgnoreCase(Constants.NOT_NULL))
isNullColumn = false;
} else if(colDefinitionChildTree instanceof MySqlParser.DimensionDataTypeContext) {
if (colDefinitionChildTree.getText() != null) {

}
} else if (colDefinitionChildTree instanceof MySqlParser.PrimaryKeyColumnConstraintContext) {
for(ParseTree primaryKeyTree: ((MySqlParser.PrimaryKeyColumnConstraintContext) colDefinitionChildTree).children) {
System.out.println(primaryKeyTree.getText());
orderByColumns.append(columnName);
break;
}
} else if (colDefinitionChildTree instanceof MySqlParser.GeneratedColumnConstraintContext) {
for(ParseTree generatedColumnTree: ((MySqlParser.GeneratedColumnConstraintContext) colDefinitionChildTree).children) {
if(generatedColumnTree instanceof MySqlParser.ExpressionContext) {
isGeneratedColumn = true;
generatedColumn = generatedColumnTree.getText();
//this.query.append(Constants.AS).append(" ").append(expression);
}
}

}
}
if(isGeneratedColumn) {
if(isNullColumn){
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")");
} else
this.query.append(colDataType);

this.query.append(" ").append(Constants.ALIAS).append(" ").append(generatedColumn).append(",");
continue;
}

if(isNullColumn) {
this.query.append(Constants.NULLABLE).append("(").append(colDataType)
.append(")").append(",");
}
else {
this.query.append(colDataType).append(" ").append(Constants.NOT_NULLABLE).append(" ").append(",");
}
columnNames.add(columnName);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
@DisplayName("Integration Test that validates DDL Creation when there are source columns with the same name(is_deleted)")
public class IsDeletedColumnsIT {

protected MySQLContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("data_types.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
// clickHouseContainer.start();
Thread.sleep(15000);
}

static {
clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

clickHouseContainer.start();
}


@ParameterizedTest
@CsvSource({
"clickhouse/clickhouse-server:latest",
"clickhouse/clickhouse-server:22.3"
})
@DisplayName("Test that validates create table in CH when MySQL has is_deleted columns")
public void testIsDeleted(String clickHouseServerVersion) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});


Thread.sleep(10000);
Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("create table new_table(col1 varchar(255), col2 int, is_deleted int, _sign int)").execute();

Thread.sleep(10000);

conn.prepareStatement("insert into new_table values('test', 1, 22, 1)").execute();
conn.close();
Thread.sleep(10000);

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
ResultSet rs = writer.executeQueryWithResultSet("select * from new_table");
boolean recordFound = false;
while(rs.next()) {
recordFound = true;
Assert.assertTrue(rs.getString("col1").equalsIgnoreCase("test"));
Assert.assertTrue(rs.getInt("col2") == 1);
Assert.assertTrue(rs.getInt("is_deleted") == 22);
Assert.assertTrue(rs.getInt("_sign") == 1);
}
Assert.assertTrue(recordFound);

if(engine.get() != null) {
engine.get().stop();
}
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,16 @@ public void testGeneratedColumn() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE employees.contacts(fullname Nullable(String) ALIAS CONCAT(first_name,' ',last_name),email String NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()"));
}

@Test
public void testSourceWithIsDeletedColumn() {
StringBuffer clickHouseQuery = new StringBuffer();

String sql = "create table new_table(col1 varchar(255), col2 int, is_deleted int, _sign int);";
mySQLDDLParserService.parseSql(sql, "", clickHouseQuery);

Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE new_table(col1 Nullable(String),col2 Nullable(Int32),is_deleted Nullable(Int32),_sign Nullable(Int32),`_version` UInt64,`__is_deleted` UInt8) Engine=ReplacingMergeTree(_version,__is_deleted) ORDER BY tuple()"));
}

@ParameterizedTest
@CsvSource({
"ALTER TABLE test_table rename to test_table_new, false",
Expand Down
Loading

0 comments on commit f1eb58a

Please sign in to comment.