Skip to content

Commit

Permalink
Merge pull request #29 from Altinity/fix_auto_create_table_retry
Browse files Browse the repository at this point in the history
Fix auto create table retry
  • Loading branch information
subkanthi authored Aug 2, 2022
2 parents cc90b40 + 46d5829 commit 01c939c
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 67 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
Sink connector sinks data from Kafka into Clickhouse.
The connector is tested with the following converters
- JsonConverter
- AvroConverter (Using [Apicurio Schema Registry](https://www.apicur.io/registry/))
- AvroConverter (Using [Apicurio Schema Registry](https://www.apicur.io/registry/) and Confluent Schema Registry)

![](doc/img/sink_connector_mysql_architecture.jpg)
# Features
- Inserts, Updates and Deletes using ReplacingMergeTree/CollapsingMergeTree - [Updates/Deletes](doc/mutable_data.md)
- Deduplication logic to dedupe records from Kafka topic.(Based on Primary Key)
Expand All @@ -17,6 +18,10 @@ The connector is tested with the following converters
- Kafka Offset management in ClickHouse
- Increased Parallelism(Customize thread pool for JDBC connections)



### Grafana Dashboard
![](doc/img/Grafana_dashboard.png) \
# Source Databases
- MySQL (Debezium)
- PostgreSQL (Debezium) (Testing in progress)
Expand Down
4 changes: 2 additions & 2 deletions deploy/debezium-connector-setup-schema-registry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ else
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url":"http://schemaregistry:8081",
"topic.creation.$alias.partitions": 6,
"topic.creation.$alias.partitions": 1,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 6,
"topic.creation.default.partitions": 1,
"provide.transaction.metadata": "true"
}
Expand Down
35 changes: 22 additions & 13 deletions ...er-compose-confluent-schema-registry.yaml → ...ker-compose-apicurio-schema-registry.yaml
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ services:

schemaregistry:
container_name: schemaregistry
#image: apicurio/apicurio-registry-mem:latest-release
image: confluentinc/cp-schema-registry:latest
image: apicurio/apicurio-registry-mem:latest-release
#image: confluentinc/cp-schema-registry:latest
restart: "no"
ports:
- "8081:8081"
#environment:
# - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- "8080:8080"
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schemaregistry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_DEBUG=true
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
# environment:
# - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
# - SCHEMA_REGISTRY_HOST_NAME=localhost
# - SCHEMA_REGISTRY_LISTENERS=http://localhost:8081
# - SCHEMA_REGISTRY_DEBUG=true

depends_on:
- kafka
Expand All @@ -101,7 +101,7 @@ services:
build:
context: ../../docker/debezium_jmx
args:
DEBEZIUM_VERSION: 1.9.5.Final
DEBEZIUM_VERSION: 2.0
JMX_AGENT_VERSION: 0.15.0
restart: "no"
ports:
Expand All @@ -120,8 +120,17 @@ services:
- OFFSET_STORAGE_TOPIC=offset-storage-topic-debezium
- STATUS_STORAGE_TOPIC=status-storage-topic-debezium
- LOG_LEVEL=INFO
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- ENABLE_APICURIO_CONVERTERS=true
- KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://schemaregistry:8080/apis/registry/v2
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://schemaregistry:8080/apis/registry/v2
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
- JMXHOST=localhost
- JMXPORT=1976
Expand Down Expand Up @@ -274,7 +283,7 @@ services:
build:
context: ../../docker/grafana
args:
GRAFANA_VERSION: latest
GRAFANA_VERSION: 7.5.5
#container_name: grafana
#image: grafana/grafana
restart: "no"
Expand Down
35 changes: 13 additions & 22 deletions deploy/docker/docker-compose.yaml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ services:

schemaregistry:
container_name: schemaregistry
image: apicurio/apicurio-registry-mem:latest-release
#image: confluentinc/cp-schema-registry:latest
#image: apicurio/apicurio-registry-mem:latest-release
image: confluentinc/cp-schema-registry:latest
restart: "no"
ports:
- "8080:8080"
- "8081:8081"
#environment:
# - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
# environment:
# - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
# - SCHEMA_REGISTRY_HOST_NAME=localhost
# - SCHEMA_REGISTRY_LISTENERS=http://localhost:8081
# - SCHEMA_REGISTRY_DEBUG=true
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schemaregistry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_DEBUG=true

depends_on:
- kafka
Expand All @@ -101,7 +101,7 @@ services:
build:
context: ../../docker/debezium_jmx
args:
DEBEZIUM_VERSION: 2.0
DEBEZIUM_VERSION: 1.9.5.Final
JMX_AGENT_VERSION: 0.15.0
restart: "no"
ports:
Expand All @@ -120,17 +120,8 @@ services:
- OFFSET_STORAGE_TOPIC=offset-storage-topic-debezium
- STATUS_STORAGE_TOPIC=status-storage-topic-debezium
- LOG_LEVEL=INFO
- ENABLE_APICURIO_CONVERTERS=true
- KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://schemaregistry:8080/apis/registry/v2
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://schemaregistry:8080/apis/registry/v2
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
- JMXHOST=localhost
- JMXPORT=1976
Expand Down Expand Up @@ -283,7 +274,7 @@ services:
build:
context: ../../docker/grafana
args:
GRAFANA_VERSION: 7.5.5
GRAFANA_VERSION: latest
#container_name: grafana
#image: grafana/grafana
restart: "no"
Expand Down
2 changes: 1 addition & 1 deletion deploy/sink-connector-setup-schema-registry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ else
"replacingmergetree.delete.column": "sign",
"auto.create.tables": false,
"auto.create.tables": true,
"schema.evolution": false,
"deduplication.policy": "off"
Expand Down
2 changes: 1 addition & 1 deletion deploy/sink-connector-setup-sysbench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ else
"value.converter.schema.registry.url":"http://schemaregistry:8081",
"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,
"topic.creation.default.partitions": 1,
"store.raw.data": false,
"store.raw.data.column": "raw_data",
Expand Down
9 changes: 7 additions & 2 deletions deploy/sysbench/compare_mysql_ch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
rm -fr MySQL.tsv
rm -fr CH.tsv

docker exec -it clickhouse clickhouse-client -uroot --password root --query "select id ,k from test.sbtest1 where sign !=-1 order by id format TSV" | grep -v "<jemalloc>" >CH.tsv
docker exec -it mysql-master mysql -uroot -proot -B -N -e "select * from sbtest.sbtest1 order by id" | grep -v "Using a password on the command line interface" >MySQL.tsv
if [[ $1 == "bulk_insert" ]]; then
docker exec -it clickhouse clickhouse-client -uroot --password root --query "select id ,k from test.sbtest1 where sign !=-1 order by id format TSV" | grep -v "<jemalloc>" >CH.tsv
else
docker exec -it clickhouse clickhouse-client -uroot --password root --query "select id ,k, c, pad from test.sbtest1 where sign !=-1 order by id format TSV" | grep -v "<jemalloc>" >CH.tsv

fi
docker exec -it mysql-master mysql -uroot -proot -B -N -e "select * from sbtest.sbtest1 order by id" | grep -v "Using a password on the command line interface" >MySQL.tsv

diff --strip-trailing-cr MySQL.tsv CH.tsv
Binary file added doc/img/Grafana_dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/img/replacingmergetree_update_delete.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/img/sink_connector_mysql_architecture.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions doc/mutable_data.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ For inserts, record will be inserted with `sign` set to `1`
For updates, `before` value will be inserted with `sign` set to `-1`
and `after` value will be inserted with `sign` set to `1`

![](img/replacingmergetree_update_delete.jpg) \

When `optimize table <table_name> final` of `select .. final` is performed and when the merges are performed by
ClickHouse in the background, the initial insert record will be merged along the `before` record.

Expand All @@ -18,8 +20,6 @@ Non Primary key updates create a record with operation as 'u'
SinkRecord{kafkaOffset=62984, timestampType=CreateTime} ConnectRecord{topic='SERVER5432.sbtest.sbtest1', kafkaPartition=0, key=Struct{id=2317,k=3739}, keySchema=Schema{SERVER5432.sbtest.sbtest1.Key:STRUCT}, value=Struct{before=Struct{id=2317,k=3739,c=20488251985-66135155553-00362235007-72249840112-70784105787-84584360668-65106023418-49140058226-99031281108-48426083028,pad=18846546959-44726413785-66695616247-63594911107-83062207348},after=Struct{id=2317,k=3739,c=20488251985-66135155553-00362235007-72249840112-70784105787-84584360668-65106023418-49140058226-99031281108-48426083029,pad=18846546959-44726413785-66695616247-63594911107-83062207348},source=Struct{version=1.9.2.Final,connector=mysql,name=SERVER5432,ts_ms=1657658606000,snapshot=false,db=sbtest,table=sbtest1,server_id=842,file=mysql-bin.000003,pos=16210729,row=0,thread=22},op=u,ts_ms=1657658606611,transaction=Struct{id=file=mysql-bin.000003,pos=16210580,total_order=1,data_collection_order=1}}, valueSchema=Schema{SERVER5432.sbtest.sbtest1.Envelope:STRUCT}, timestamp=1657658607050, headers=ConnectHeaders(headers=)}
```


### Updates on Primary Key: Debezium

Debezium handles updates on Primary key in the same way as Primary Key changes.
Expand Down
34 changes: 32 additions & 2 deletions doc/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@ It will start:
4. RedPanda
5. clickhouse-kafka-sink-connector
6. Clickhouse
7. Confluent Schema registry or Apicurio Schema registry

The `start-docker-compose.sh` by default uses the `latest` tag, you could also pass the docker tag to the script.
Altinity sink images are tagged on every successful build with the following format(yyyy-mm-dd) Example(2022-07-19)

### MySQL:
```bash
cd deploy/docker
./start-docker-compose.sh
```

### Postgres:
```bash
cd deploy/docker
docker-compose -f docker-compose-postgresql.yaml up
```

### Start Docker-compose with a specific docker tag
```bash
cd deploy/docker
Expand All @@ -48,14 +57,35 @@ cd deploy/docker
# Source connector
After all the docker containers are up and running, execute the following command
to create the Debezium MySQL connector.
Make sure MySQL master/slave is up and running before executing the following script.

Make sure MySQL master/slave is up and running before executing the following script.\

### MySQL:
```bash
../deploy/debezium-connector-setup-schema-registry.sh
```
[debezium-connector-setup-schema-registry.sh](../deploy/debezium-connector-setup-schema-registry.sh)

### Postgres(Using Apicurio):
```bash
../deploy/debezium-connector-setup-schema-registry.sh postgres apicurio
```

# Sink Connector
After the source connector is created,
execute the script [sink-connector-setup-schema-registry.sh](../deploy/sink-connector-setup-schema-registry.sh)
to create the Clickhouse Sink connector using Kafka connect REST API

### MySQL:
```bash
../deploy/sink-connector-setup-schema-registry.sh
```
### Postgres(Using Apicurio):
```bash
../deploy/sink-connector-setup-schema-registry.sh postgres apicurio
```


# Deleting connectors
The source connector can be deleted using the following script
[debezium-delete.sh](../deploy/debezium-delete.sh)
Expand All @@ -66,7 +96,7 @@ The sink connector can be deleted using the following script
# References
Kafka Connect REST API - (https://docs.confluent.io/platform/current/connect/references/restapi.html)

[docker-compose.yaml]: ../deploy/docker/docker-compose.yaml
[docker-compose.yaml]: ../deploy/docker/docker-compose-apicurio-schema-registry.yaml
[Dockerfile]: ../docker/Dockerfile-sink-on-debezium-base-image


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ public DbWriter(
MutablePair<DBMetadata.TABLE_ENGINE, String> response = metadata.getTableEngine(this.conn, database, tableName);
this.engine = response.getLeft();

long taskId = this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID);

//ToDO: Is this a reliable way of checking if the table exists already.
if (this.engine == null) {
if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES)) {
log.info("**** AUTO CREATE TABLE " + tableName);
log.info(String.format("**** Task(%s), AUTO CREATE TABLE (%s) *** ",taskId, tableName));
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
try {
act.createNewTable(record.getPrimaryKey(), tableName, record.getAfterStruct().schema().fields().toArray(new Field[0]), this.conn);
Expand All @@ -99,6 +101,8 @@ public DbWriter(
} catch (Exception e) {
log.error("**** Error creating table ***" + tableName, e);
}
} else {
log.error("********* AUTO CREATE DISABLED, Table does not exist, please enable it by setting auto.create.tables=true");
}
}

Expand All @@ -113,6 +117,16 @@ public DbWriter(
this.replacingMergeTreeDeleteColumn = this.config.getString(ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN);
}

public boolean wasTableMetaDataRetrieved() {
boolean result = true;

if(this.engine == null || this.columnNameToDataTypeMap == null || this.columnNameToDataTypeMap.isEmpty()) {
result = false;
}

return result;
}

/**
* Function to check if the column is of DateTime64
* from the column type(string name)
Expand Down Expand Up @@ -235,8 +249,10 @@ public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<Cli

boolean enableSchemaEvolution = this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_SCHEMA_EVOLUTION);

boolean result = false;

if(CdcRecordState.CDC_RECORD_STATE_BEFORE == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
updateQueryToRecordsMap(record, record.getBeforeModifiedFields(), queryToRecordsMap);
result = updateQueryToRecordsMap(record, record.getBeforeModifiedFields(), queryToRecordsMap);
} else if(CdcRecordState.CDC_RECORD_STATE_AFTER == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
if(enableSchemaEvolution) {
try {
Expand All @@ -248,21 +264,23 @@ public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<Cli
}
}

updateQueryToRecordsMap(record, record.getAfterModifiedFields(), queryToRecordsMap);
result = updateQueryToRecordsMap(record, record.getAfterModifiedFields(), queryToRecordsMap);
} else if(CdcRecordState.CDC_RECORD_STATE_BOTH == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
updateQueryToRecordsMap(record, record.getBeforeModifiedFields(), queryToRecordsMap);
updateQueryToRecordsMap(record, record.getAfterModifiedFields(), queryToRecordsMap);
result = updateQueryToRecordsMap(record, record.getBeforeModifiedFields(), queryToRecordsMap);
result = updateQueryToRecordsMap(record, record.getAfterModifiedFields(), queryToRecordsMap);
} else {
log.error("INVALID CDC RECORD STATE");
}

// Remove the record from shared records.
iterator.remove();
if(result) {
iterator.remove();
}
}
return partitionToOffsetMap;
}

public void updateQueryToRecordsMap(ClickHouseStruct record, List<Field> modifiedFields,
public boolean updateQueryToRecordsMap(ClickHouseStruct record, List<Field> modifiedFields,
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap) {
MutablePair<String, Map<String, Integer>> response= new QueryFormatter().getInsertQueryUsingInputFunction
(this.tableName, modifiedFields, this.columnNameToDataTypeMap,
Expand All @@ -272,9 +290,9 @@ public void updateQueryToRecordsMap(ClickHouseStruct record, List<Field> modifie
this.signColumn, this.versionColumn, this.replacingMergeTreeDeleteColumn, this.engine);

String insertQueryTemplate = response.getKey();
if(response.getValue() == null) {
log.error("********* COLUMN TO INDEX MAP EMPTY");
return;
if(response.getKey() == null || response.getValue() == null) {
log.error("********* QUERY or COLUMN TO INDEX MAP EMPTY");
return false;
// this.columnNametoIndexMap = response.right;
}

Expand All @@ -293,6 +311,8 @@ public void updateQueryToRecordsMap(ClickHouseStruct record, List<Field> modifie

queryToRecordsMap.put(mp, recordsList);
}

return true;
}

/**
Expand Down
Loading

0 comments on commit 01c939c

Please sign in to comment.