From 319d291d63de4f3bfce3ff98e432c2b57d5eef8f Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sat, 6 Jul 2024 11:34:09 -0400 Subject: [PATCH] Added integration test for MariaDB --- sink-connector-lightweight/pom.xml | 6 + .../debezium/embedded/ITCommon.java | 55 ++++++ .../debezium/embedded/MariaDBIT.java | 160 ++++++++++++++++++ .../src/test/resources/my.cnf | 43 +++++ 4 files changed, 264 insertions(+) create mode 100644 sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java create mode 100644 sink-connector-lightweight/src/test/resources/my.cnf diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml index 136498017..f8a0e2d64 100644 --- a/sink-connector-lightweight/pom.xml +++ b/sink-connector-lightweight/pom.xml @@ -250,6 +250,12 @@ ${version.testcontainers} test + + org.testcontainers + mariadb + ${version.testcontainers} + test + org.testcontainers mongodb diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java index 61b89c384..bcb2a007a 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java @@ -29,6 +29,23 @@ static public Connection connectToMySQL(MySQLContainer mySqlContainer) { return conn; } + static public Connection connectToMySQL(String host, String port, String databaseName, String userName, String password) { + Connection conn = null; + try { + + String connectionUrl = String.format("jdbc:mysql://%s:%s/%s?user=%s&password=%s", host, port, + databaseName, userName, password); + conn = DriverManager.getConnection(connectionUrl); + + + } catch (SQLException ex) { + // handle any errors + + } + + return conn; + } + // Function to connect to Postgres. static public Connection connectToPostgreSQL(PostgreSQLContainer postgreSQLContainer) { Connection conn = null; @@ -45,6 +62,44 @@ static public Connection connectToPostgreSQL(PostgreSQLContainer postgreSQLConta return conn; } + static public Properties getDebeziumProperties(String mySQLHost, String mySQLPort, ClickHouseContainer clickHouseContainer) throws Exception { + + // Start the debezium embedded application. + + Properties defaultProps = new Properties(); + Properties defaultProperties = PropertiesHelper.getProperties("config.properties"); + + defaultProps.putAll(defaultProperties); + Properties fileProps = new ConfigLoader().load("config.yml"); + defaultProps.putAll(fileProps); + + defaultProps.setProperty("database.hostname", mySQLHost); + defaultProps.setProperty("database.port", String.valueOf(mySQLPort)); + defaultProps.setProperty("database.user", "root"); + defaultProps.setProperty("database.password", "adminpass"); + + defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost()); + defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort())); + defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername()); + defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword()); + + defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s", + clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort())); + + + return defaultProps; + + } + static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, ClickHouseContainer clickHouseContainer) throws Exception { // Start the debezium embedded application. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java new file mode 100644 index 000000000..aec3c4bff --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java @@ -0,0 +1,160 @@ +package com.altinity.clickhouse.debezium.embedded; + +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.clickhouse.jdbc.ClickHouseConnection; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Integration test to validate support for replication of multiple databases. + */ +@Testcontainers +@DisplayName("Integration Test that validates basic replication of MariaDB databases") +public class MariaDBIT +{ + + protected MariaDBContainer mySqlContainer; + static ClickHouseContainer clickHouseContainer; + + @BeforeEach + public void startContainers() throws InterruptedException { + mySqlContainer = (MariaDBContainer) new MariaDBContainer() + .withDatabaseName("employees").withUsername("adminuser").withPassword("adminpass") + // .withInitScript("data_types.sql") + .withCopyFileToContainer( + MountableFile.forClasspathResource("my.cnf"), // Adjust this to your resource file + "/etc/mysql/my.cnf" + ) + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + Thread.sleep(15000); + } + + static { + clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + clickHouseContainer.start(); + } + + @DisplayName("Integration Test that validates handle of JSON data type from MySQL") + @Test + public void testMultipleDatabases() throws Exception { + + AtomicReference engine = new AtomicReference<>(); + + Properties props = ITCommon.getDebeziumProperties(mySqlContainer.getHost(), + String.valueOf(mySqlContainer.getFirstMappedPort()), clickHouseContainer); + // Set the list of databases captured. + props.put("database.whitelist", "employees,test_db,test_db2"); + props.put("database.include.list", "employees,test_db,test_db2"); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(props, new SourceRecordParserService(), + new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(30000); + Connection conn = ITCommon.connectToMySQL(mySqlContainer.getHost(), String.valueOf(mySqlContainer.getFirstMappedPort()), + mySqlContainer.getDatabaseName(), mySqlContainer.getUsername(), mySqlContainer.getPassword()); + + //conn.createStatement().execute("CREATE DATABASE test_db2"); + Thread.sleep(5000); + // Create a new database + conn.createStatement().execute("CREATE TABLE employees.audience (" + + "id int unsigned NOT NULL AUTO_INCREMENT, " + + "client_id int unsigned NOT NULL, " + + "list_id int unsigned NOT NULL, " + + "status tinyint NOT NULL, " + + "email varchar(200) CHARACTER SET utf16 COLLATE utf16_unicode_ci NOT NULL, " + + "custom_properties JSON, " + + "source tinyint unsigned NOT NULL DEFAULT '0', " + + "created_date datetime DEFAULT NULL, " + + "modified_date datetime DEFAULT NULL, " + + "property_update_date datetime DEFAULT NULL, " + + "PRIMARY KEY (id), " + + "KEY cid_email (client_id,email), " + + "KEY cid (client_id,list_id,status), " + + "KEY contact_created (created_date), " + + "KEY idx_email (email)" + + ") ENGINE=InnoDB CHARSET=utf16 COLLATE=utf16_unicode_ci"); + + + Thread.sleep(5000); + // Insert a new row. + conn.createStatement().execute("INSERT INTO employees.audience (client_id, list_id, status, email, custom_properties, source, created_date, modified_date, property_update_date)" + + " VALUES (1, 100, 1, 'example@example.com', '{\"name\": \"John\", \"age\": 30}', 1, '2024-05-13 12:00:00', '2024-05-13 12:00:00', '2024-05-13 12:00:00')"); + + Thread.sleep(10000); + conn.close(); + + // Create connection to clickhouse and validate if the tables are replicated. + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "system"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + // query clickhouse connection and get data for test_table1 and test_table2 + + + ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience"); + // Validate the data + boolean recordFound = false; + while(rs.next()) { + recordFound = true; + assert rs.getInt("id") == 1; + //assert rs.getString("name").equalsIgnoreCase("test"); + } + Assert.assertTrue(recordFound); + + + + if(engine.get() != null) { + engine.get().stop(); + } + // Files.deleteIfExists(tmpFilePath); + executorService.shutdown(); + + writer.getConnection().close(); + + + } +} diff --git a/sink-connector-lightweight/src/test/resources/my.cnf b/sink-connector-lightweight/src/test/resources/my.cnf new file mode 100644 index 000000000..0e745dabe --- /dev/null +++ b/sink-connector-lightweight/src/test/resources/my.cnf @@ -0,0 +1,43 @@ +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/8.2/en/server-configuration-defaults.html + +# -------------------------------------------------------------------------------------------- +# This section specifies 5.5 and cross-version common configurations +# -------------------------------------------------------------------------------------------- +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +#secure-file-priv=/var/lib/mysql-files +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 112233 +log_bin = mysql-bin +binlog_format = row +log_bin_compress = off \ No newline at end of file