Skip to content

Commit

Permalink
Merge pull request #673 from Altinity/628-add-integration-test-for-ma…
Browse files Browse the repository at this point in the history
…riadb

628 add integration test for mariadb
  • Loading branch information
subkanthi authored Aug 25, 2024
2 parents 237a411 + 319d291 commit 8c5b04c
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 0 deletions.
6 changes: 6 additions & 0 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ static public Connection connectToMySQL(MySQLContainer mySqlContainer) {
return conn;
}

static public Connection connectToMySQL(String host, String port, String databaseName, String userName, String password) {
Connection conn = null;
try {

String connectionUrl = String.format("jdbc:mysql://%s:%s/%s?user=%s&password=%s", host, port,
databaseName, userName, password);
conn = DriverManager.getConnection(connectionUrl);


} catch (SQLException ex) {
// handle any errors

}

return conn;
}

// Function to connect to Postgres.
static public Connection connectToPostgreSQL(PostgreSQLContainer postgreSQLContainer) throws SQLException {
Connection conn = null;
Expand All @@ -41,6 +58,44 @@ static public Connection connectToPostgreSQL(PostgreSQLContainer postgreSQLConta
return conn;
}

static public Properties getDebeziumProperties(String mySQLHost, String mySQLPort, ClickHouseContainer clickHouseContainer) throws Exception {

// Start the debezium embedded application.

Properties defaultProps = new Properties();
Properties defaultProperties = PropertiesHelper.getProperties("config.properties");

defaultProps.putAll(defaultProperties);
Properties fileProps = new ConfigLoader().load("config.yml");
defaultProps.putAll(fileProps);

defaultProps.setProperty("database.hostname", mySQLHost);
defaultProps.setProperty("database.port", String.valueOf(mySQLPort));
defaultProps.setProperty("database.user", "root");
defaultProps.setProperty("database.password", "adminpass");

defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));


return defaultProps;

}

static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, ClickHouseContainer clickHouseContainer) throws Exception {

// Start the debezium embedded application.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

/**
* Integration test to validate support for replication of multiple databases.
*/
@Testcontainers
@DisplayName("Integration Test that validates basic replication of MariaDB databases")
public class MariaDBIT
{

protected MariaDBContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = (MariaDBContainer) new MariaDBContainer()
.withDatabaseName("employees").withUsername("adminuser").withPassword("adminpass")
// .withInitScript("data_types.sql")
.withCopyFileToContainer(
MountableFile.forClasspathResource("my.cnf"), // Adjust this to your resource file
"/etc/mysql/my.cnf"
)
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
Thread.sleep(15000);
}

static {
clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

clickHouseContainer.start();
}

@DisplayName("Integration Test that validates handle of JSON data type from MySQL")
@Test
public void testMultipleDatabases() throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

Properties props = ITCommon.getDebeziumProperties(mySqlContainer.getHost(),
String.valueOf(mySqlContainer.getFirstMappedPort()), clickHouseContainer);
// Set the list of databases captured.
props.put("database.whitelist", "employees,test_db,test_db2");
props.put("database.include.list", "employees,test_db,test_db2");

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(30000);
Connection conn = ITCommon.connectToMySQL(mySqlContainer.getHost(), String.valueOf(mySqlContainer.getFirstMappedPort()),
mySqlContainer.getDatabaseName(), mySqlContainer.getUsername(), mySqlContainer.getPassword());

//conn.createStatement().execute("CREATE DATABASE test_db2");
Thread.sleep(5000);
// Create a new database
conn.createStatement().execute("CREATE TABLE employees.audience ("
+ "id int unsigned NOT NULL AUTO_INCREMENT, "
+ "client_id int unsigned NOT NULL, "
+ "list_id int unsigned NOT NULL, "
+ "status tinyint NOT NULL, "
+ "email varchar(200) CHARACTER SET utf16 COLLATE utf16_unicode_ci NOT NULL, "
+ "custom_properties JSON, "
+ "source tinyint unsigned NOT NULL DEFAULT '0', "
+ "created_date datetime DEFAULT NULL, "
+ "modified_date datetime DEFAULT NULL, "
+ "property_update_date datetime DEFAULT NULL, "
+ "PRIMARY KEY (id), "
+ "KEY cid_email (client_id,email), "
+ "KEY cid (client_id,list_id,status), "
+ "KEY contact_created (created_date), "
+ "KEY idx_email (email)"
+ ") ENGINE=InnoDB CHARSET=utf16 COLLATE=utf16_unicode_ci");


Thread.sleep(5000);
// Insert a new row.
conn.createStatement().execute("INSERT INTO employees.audience (client_id, list_id, status, email, custom_properties, source, created_date, modified_date, property_update_date)" +
" VALUES (1, 100, 1, 'example@example.com', '{\"name\": \"John\", \"age\": 30}', 1, '2024-05-13 12:00:00', '2024-05-13 12:00:00', '2024-05-13 12:00:00')");

Thread.sleep(10000);
conn.close();

// Create connection to clickhouse and validate if the tables are replicated.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"system");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
// query clickhouse connection and get data for test_table1 and test_table2


ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience");
// Validate the data
boolean recordFound = false;
while(rs.next()) {
recordFound = true;
assert rs.getInt("id") == 1;
//assert rs.getString("name").equalsIgnoreCase("test");
}
Assert.assertTrue(recordFound);



if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

writer.getConnection().close();


}
}
43 changes: 43 additions & 0 deletions sink-connector-lightweight/src/test/resources/my.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/8.2/en/server-configuration-defaults.html

# --------------------------------------------------------------------------------------------
# This section specifies 5.5 and cross-version common configurations
# --------------------------------------------------------------------------------------------
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid

# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 112233
log_bin = mysql-bin
binlog_format = row
log_bin_compress = off

0 comments on commit 8c5b04c

Please sign in to comment.