From fe74bdcb9a2fbc39fdbb6951270022a8acdf80f6 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 4 Jul 2024 11:58:52 -0400 Subject: [PATCH] Added logic to move the CREATE VIEW operation to after the connector has started --- .../cdc/DebeziumChangeEventCapture.java | 72 +++++++++++-------- .../src/main/resources/config.properties | 46 ++++-------- .../ClickHouseSinkConnectorConfig.java | 14 ++++ ...lickHouseSinkConnectorConfigVariables.java | 1 + 4 files changed, 71 insertions(+), 62 deletions(-) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 4374a405b..23e966bd2 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -307,40 +307,52 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf "system", dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); - String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + - JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); - if(tableName.contains(".")) { - String[] dbTableNameArray = tableName.split("\\."); - if(dbTableNameArray.length >= 2) { - String dbName = dbTableNameArray[0].replace("\"", ""); - String createDbQuery = String.format("create database if not exists %s", dbName); - log.info("CREATING DEBEZIUM STORAGE Database: " + createDbQuery); - writer.executeQuery(createDbQuery); - - - // Also create view. - String view = " CREATE VIEW IF NOT EXISTS %s.show_replica_status\n" + - " AS\n" + - " SELECT\n" + - " now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source,\n" + - " toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time,\n" + - " fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time,\n" + - " *\n" + - " FROM %s\n" + - " FINAL"; - String formattedView = String.format(view, dbName, tableName); - try { - writer.executeQuery(formattedView); - } catch(Exception e) { - log.error("**** Error creating VIEW **** " + formattedView); - } - } - } + Pair tableNameDatabaseName = getDebeziumStorageDatabaseName(props); + String databaseName = tableNameDatabaseName.getRight(); + + String createDbQuery = String.format("create database if not exists %s", databaseName); + log.info("CREATING DEBEZIUM STORAGE Database: " + createDbQuery); + writer.executeQuery(createDbQuery); + } catch(Exception e) { log.error("Error creating Debezium storage database", e); } } + /** + * Function to create view for show_replica_status + * @param config + * @param props + */ + private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config, Properties props) { + String view = props.getProperty(ClickHouseSinkConnectorConfigVariables.REPLICA_STATUS_VIEW.toString()); + if(view == null || view.isEmpty() == true) { + log.warn("Skipping creating view for replica_status as the query was not provided in configuration"); + return; + } + DBCredentials dbCredentials = parseDBConfiguration(config); + + String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), + "system"); + ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config); + BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), + "system", dbCredentials.getUserName(), + dbCredentials.getPassword(), config, conn); + Pair tableNameDatabaseName = getDebeziumStorageDatabaseName(props); + + String tableName = tableNameDatabaseName.getLeft(); + String dbName = tableNameDatabaseName.getRight(); + + String formattedView = String.format(view, dbName, dbName + "." + tableName); + // Remove quotes. + formattedView = formattedView.replace("\"", ""); + try { + writer.executeQuery(formattedView); + } catch(Exception e) { + log.error("**** Error creating VIEW **** " + formattedView); + } + } + /** * * @param props @@ -623,6 +635,8 @@ public void handle(boolean b, String s, Throwable throwable) { public void connectorStarted() { isReplicationRunning = true; log.debug("Connector started"); + // Create view. + createViewForShowReplicaStatus(config, props); } @Override diff --git a/sink-connector-lightweight/src/main/resources/config.properties b/sink-connector-lightweight/src/main/resources/config.properties index 495e361ee..48ad71c22 100644 --- a/sink-connector-lightweight/src/main/resources/config.properties +++ b/sink-connector-lightweight/src/main/resources/config.properties @@ -1,45 +1,25 @@ name= "engine" -#offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore -#offset.storage.file.filename=/tmp/offsets.dat - database.server.name= "clickhouse-ddl" database.server.id= 976 -#database.history= "io.debezium.relational.history.FileDatabaseHistory" -#database.history.file.filename=/tmp/dbhistory.dat -#connector.class= io.debezium.connector.mysql.MySqlConnector converter.schemas.enable= "true" schemas.enable= true topic.prefix=embeddedconnector offset.storage=io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore offset.storage.jdbc.offset.table.name= "default.replica_source_info" -offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123" -offset.storage.jdbc.user: "root" -offset.storage.jdbc.password: "root" -offset.storage.jdbc.offset.table.ddl: "CREATE TABLE %s -( -`id` String, -`offset_key` String, -`offset_val` String, -`record_insert_ts` DateTime, -`record_insert_seq` UInt64, -`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)) -) -ENGINE = ReplacingMergeTree(_version) -ORDER BY id -SETTINGS index_granularity = 8192" -offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1" -schema.history.internal: io.debezium.storage.jdbc.history.JdbcSchemaHistory -schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123" -schema.history.internal.jdbc.user: "root" -schema.history.internal.jdbc.password: "root" -schema.history.internal.schema.history.table.ddl: "CREATE TABLE %s -(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, -`record_insert_seq` INTEGER NOT NULL) Engine=ReplacingMergeTree(record_insert_seq) order by id" - -schema.history.internal.schema.history.table.name: "default.replicate_schema_history" - +offset.storage.jdbc.url= "jdbc:clickhouse://clickhouse:8123" +offset.storage.jdbc.user= "root" +offset.storage.jdbc.password= "root" +offset.storage.jdbc.offset.table.ddl="CREATE TABLE %s(`id` String, `offset_key` String, `offset_val` String, `record_insert_ts` DateTime, `record_insert_seq` UInt64, `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)))ENGINE = ReplacingMergeTree(_version) ORDER BY id SETTINGS index_granularity = 8192" +offset.storage.jdbc.offset.table.delete="delete from %s where 1=1" +schema.history.internal=io.debezium.storage.jdbc.history.JdbcSchemaHistory +schema.history.internal.jdbc.url="jdbc:clickhouse://clickhouse:8123" +schema.history.internal.jdbc.user="root" +schema.history.internal.jdbc.password="root" +schema.history.internal.schema.history.table.ddl="CREATE TABLE %s(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) Engine=ReplacingMergeTree(record_insert_seq) order by id" +schema.history.internal.schema.history.table.name="default.replicate_schema_history" auto.create.tables= false replacingmergetree.delete.column=_sign metrics.enable= true metrics.port= 8083 -snapshot.mode= "initial" \ No newline at end of file +snapshot.mode= "initial" +replica.status.view="CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s FINAL" \ No newline at end of file diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index 95c6aec1e..0335c1257 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -450,6 +450,20 @@ static ConfigDef newConfigDef() { 6, ConfigDef.Width.NONE, ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) + .define( + ClickHouseSinkConnectorConfigVariables.REPLICA_STATUS_VIEW.toString(), + Type.STRING, + "CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - " + + "fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, " + + "toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, " + + "fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time," + + "* FROM %s FINAL", + Importance.HIGH, + "SQL query to get replica status, lag etc.", + CONFIG_GROUP_CONNECTOR_CONFIG, + 6, + ConfigDef.Width.NONE, + ClickHouseSinkConnectorConfigVariables.REPLICA_STATUS_VIEW.toString()) ; } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java index 11a9ce17f..20001e092 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java @@ -72,6 +72,7 @@ public enum ClickHouseSinkConnectorConfigVariables { RESTART_EVENT_LOOP_TIMEOUT_PERIOD("restart.event.loop.timeout.period.secs"), JDBC_PARAMETERS("clickhouse.jdbc.params"), + REPLICA_STATUS_VIEW("replica.status.view"), MAX_QUEUE_SIZE("sink.connector.max.queue.size"); private String label;