diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 62853496f..ed2324581 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -2,7 +2,6 @@ import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; @@ -17,7 +16,6 @@ import com.altinity.clickhouse.sink.connector.executor.DebeziumOffsetManagement; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.altinity.clickhouse.sink.connector.model.DBCredentials; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.debezium.config.CommonConnectorConfig; @@ -198,7 +196,8 @@ private String getDatabaseName(SourceRecord sr) { private BaseDbWriter createWriter(ClickHouseSinkConnectorConfig config, String databaseName) { DBCredentials dbCredentials = parseDBConfiguration(config); String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), databaseName); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", dbCredentials.getUserName(), dbCredentials.getPassword(), config); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, dbCredentials.getUserName(), dbCredentials.getPassword(), + databaseName, config); return new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); } @@ -384,10 +383,11 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf DBCredentials dbCredentials = parseDBConfiguration(config); String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), - "system"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config); + BaseDbWriter.SYSTEM_DB); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + dbCredentials.getUserName(), dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config); BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), - "system", dbCredentials.getUserName(), + BaseDbWriter.SYSTEM_DB, dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); Pair tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props); @@ -395,7 +395,7 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf String createDbQuery = String.format("create database if not exists %s", databaseName); log.info("CREATING DEBEZIUM STORAGE Database: " + createDbQuery); - writer.executeQuery(createDbQuery); + writer.executeSystemQuery(createDbQuery); break; } catch (Exception e) { @@ -423,13 +423,14 @@ private void createSchemaHistoryTable(ClickHouseSinkConnectorConfig config, Prop DBCredentials dbCredentials = parseDBConfiguration(config); String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), "system"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + dbCredentials.getUserName(), dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config); BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), "system", dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); try { - writer.executeQuery(createSchemaHistoryTable); + writer.executeSystemQuery(createSchemaHistoryTable); } catch(Exception e) { log.error("Error creating schema history table", e); } @@ -450,7 +451,8 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), "system"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + dbCredentials.getUserName(), dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config); BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), "system", dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); @@ -463,7 +465,7 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config // Remove quotes. formattedView = formattedView.replace("\"", ""); try { - writer.executeQuery(formattedView); + writer.executeSystemQuery(formattedView); } catch(Exception e) { log.error("**** Error creating VIEW **** " + formattedView); } @@ -540,8 +542,9 @@ public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pro log.error("**** Connection to ClickHouse is not established, re-initiating ****"); String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), databaseName); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - dbCredentials.getUserName(), dbCredentials.getPassword(), config); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + dbCredentials.getUserName(), dbCredentials.getPassword(), + BaseDbWriter.SYSTEM_DB, config); writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java index 903f92878..8c2a85544 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java @@ -51,7 +51,7 @@ public void deleteOffsetStorageRow(String offsetKey, // String connectorName = config.getString("connector.name"); String debeziumStorageStatusQuery = String.format("delete from %s where offset_key='%s'" , tableName, offsetKey); - writer.executeQuery(debeziumStorageStatusQuery); + writer.executeSystemQuery(debeziumStorageStatusQuery); } /** @@ -67,7 +67,7 @@ public void deleteSchemaHistoryTable(String offsetKey, String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey); log.info("Deleting schema history table query: " + debeziumStorageStatusQuery); - writer.executeQuery(debeziumStorageStatusQuery); + writer.executeSystemQuery(debeziumStorageStatusQuery); } /** * Function to get the latest timestamp of the record in the table @@ -81,7 +81,7 @@ public String getDebeziumLatestRecordTimestamp(Properties props, BaseDbWriter wr JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); String debeziumLatestRecordTimestampQuery = String.format("select max(record_insert_ts) from %s" , tableName); - return writer.executeQuery(debeziumLatestRecordTimestampQuery); + return writer.executeSystemQuery(debeziumLatestRecordTimestampQuery); } public String getDebeziumStorageStatusQuery( @@ -92,7 +92,7 @@ public String getDebeziumStorageStatusQuery( String offsetKey = getOffsetKey(props); // String connectorName = config.getString("connector.name"); String debeziumStorageStatusQuery = String.format("select offset_val from %s where offset_key='%s'" , tableName, offsetKey); - return writer.executeQuery(debeziumStorageStatusQuery); + return writer.executeSystemQuery(debeziumStorageStatusQuery); } /** diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java index bf12ae1fc..01a14f223 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT.java @@ -91,13 +91,8 @@ public void testDecoderBufsPlugin() throws Exception { Thread.sleep(10000);// Thread.sleep(50000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); Assert.assertTrue(tmColumns.size() == 22); Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java index 2a50e1bf9..569d0ad42 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java @@ -95,13 +95,8 @@ public void testPgOutputPlugin() throws Exception { // Create connection. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); Assert.assertTrue(tmColumns.size() == 22); Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java index fc7a82d38..9d7a16955 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ITCommon.java @@ -2,6 +2,9 @@ import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper; import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; + import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.*; @@ -9,7 +12,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; - +import java.util.HashMap; public class ITCommon { static public Connection connectToMySQL(MySQLContainer mySqlContainer) { Connection conn = null; @@ -196,4 +199,18 @@ static public Properties getDebeziumPropertiesForSchemaOnly(MySQLContainer mySql props.setProperty("replica.status.view", "CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s settings final=1"); return props; } + + + static public BaseDbWriter getDBWriter(ClickHouseContainer clickHouseContainer) { + + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees"); + Connection connection = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), + clickHouseContainer.getPassword(), BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); + + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + + return writer; + } } \ No newline at end of file diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java index a734e15d6..4940dcdf5 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MariaDBIT.java @@ -126,15 +126,7 @@ public void testMultipleDatabases() throws Exception { conn.close(); // Create connection to clickhouse and validate if the tables are replicated. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); - // query clickhouse connection and get data for test_table1 and test_table2 - + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience"); // Validate the data diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java index 58a63c3dd..bb63bd8b2 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLGenerateColumnsTest.java @@ -97,11 +97,7 @@ public void testMySQLGeneratedColumns() throws Exception { conn.prepareStatement("insert into contacts(first_name, last_name, email) values('John', 'Doe', 'john.doe@gmail.com')").execute(); Thread.sleep(20000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); Map columnsToDataTypeMap = writer.getColumnsDataTypesForTable("contacts"); Assert.assertTrue(columnsToDataTypeMap.get("id").equalsIgnoreCase("Int32")); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java index 782d241e9..c4fd6d84c 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/MySQLJsonIT.java @@ -118,13 +118,7 @@ public void testMultipleDatabases() throws Exception { conn.close(); // Create connection to clickhouse and validate if the tables are replicated. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); // query clickhouse connection and get data for test_table1 and test_table2 diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java index c8419da2d..d9986fc56 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/OffsetManagementIT.java @@ -20,7 +20,6 @@ import org.testcontainers.utility.DockerImageName; import java.sql.Connection; -import java.util.HashMap; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java index 971916b71..ef1c2ca31 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerIT.java @@ -92,13 +92,8 @@ public void testDecoderBufsPlugin() throws Exception { Thread.sleep(10000);// Thread.sleep(50000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); Assert.assertTrue(tmColumns.size() == 22); Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java index cbb9e6db2..c2f4114c4 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresInitialDockerWKeeperMapStorageIT.java @@ -2,25 +2,23 @@ import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumOffsetStorage; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.Testcontainers; import org.testcontainers.clickhouse.ClickHouseContainer; -import org.testcontainers.containers.*; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; -import java.sql.Connection; import java.sql.ResultSet; -import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -115,13 +113,8 @@ public void testDecoderBufsPlugin() throws Exception { Thread.sleep(10000);// Thread.sleep(50000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); Assert.assertTrue(tmColumns.size() == 22); Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java index 72902a4fc..2346a72d9 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/PostgresPgoutputMultipleSchemaIT.java @@ -103,14 +103,8 @@ public void testMultipleSchemaReplication() throws Exception { Thread.sleep(10000); - // Create connection. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + Map tmColumns = writer.getColumnsDataTypesForTable("tm"); Assert.assertTrue(tmColumns.size() == 22); Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID")); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java index 57f47eba2..ba0174674 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTClickHouse22TIT.java @@ -101,10 +101,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E Thread.sleep(30000); Connection conn = ITCommon.connectToMySQL(mySqlContainer); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4"); // Validate that all the tables are created. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java index 2d717f1bf..97db58644 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLClickHouse22TIT.java @@ -103,10 +103,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E Thread.sleep(30000); Connection conn = ITCommon.connectToMySQL(mySqlContainer); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4"); // Validate that all the tables are created. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java index b2ef6d54a..27a01fb7c 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTDDLIT.java @@ -106,10 +106,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E Thread.sleep(30000); Connection conn = ITCommon.connectToMySQL(mySqlContainer); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4"); // Validate that all the tables are created. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java index 41b5b85d1..c548cb1c7 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ReplicatedRMTIT.java @@ -104,10 +104,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E Thread.sleep(30000); Connection conn = ITCommon.connectToMySQL(mySqlContainer); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet rs = writer.executeQueryWithResultSet("show create table string_types_MEDIUMTEXT_utf8mb4"); // Validate that all the tables are created. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java index 76df53ec9..3b04b8e13 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java @@ -4,11 +4,9 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -103,8 +101,9 @@ public void testBatchRetryOnCHFailure() throws Exception { // Check if Batch was inserted. String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection chConn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), + "employees",new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java index 13922ad94..9e34236f8 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java @@ -4,11 +4,9 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -31,7 +29,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumPropertiesForSchemaOnly; @Testcontainers @@ -95,13 +92,7 @@ public void testClickHouseDelayedStart() throws Exception { while (true) { // Check if Batch was inserted. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); try { ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME where Type = 'DATETIME-INSERT55'"); while (dateTimeResult.next()) { @@ -145,15 +136,7 @@ public void debeziumStorageView() throws Exception { Thread.sleep(10000); - // Connect to clickhouse and validate that the view was created successfully. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "altinity_sink_connector"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "altinity_sink_connector", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); - - + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); // Check if the view altinity_sink_connector.show_replica_status was created successfully. ResultSet resultSet = writer.executeQueryWithResultSet("show create view altinity_sink_connector.show_replica_status"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java index 384fd8dc4..3c6c80068 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideIT.java @@ -4,11 +4,8 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -24,9 +21,7 @@ import org.testcontainers.utility.DockerImageName; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.util.HashMap; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -117,12 +112,7 @@ public void testDatabaseOverride() throws Exception { Thread.sleep(10000); // Validate in Clickhouse the last record written is 29999 - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); long col2 = 0L; ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from employees2.newtable final where col1 = 'a'"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java index ccf5bc5f4..b6d3beeee 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideInitialIT.java @@ -4,11 +4,8 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -23,9 +20,7 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; -import java.sql.Connection; import java.sql.ResultSet; -import java.util.HashMap; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -99,13 +94,7 @@ public void testDatabaseOverride() throws Exception { Thread.sleep(10000); - // Validate in Clickhouse the last record written is 29999 - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); long col2 = 0L; ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from employees2.newtable final where col1 = 'a'"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java index c3dd06e54..8c100da0f 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DatabaseOverrideRRMTIT.java @@ -5,10 +5,8 @@ import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -23,12 +21,10 @@ import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; import java.sql.Connection; import java.sql.ResultSet; -import java.util.HashMap; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -84,15 +80,10 @@ public void startContainers() throws InterruptedException { @Test public void testDatabaseOverride() throws Exception { - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); - writer.executeQuery("CREATE DATABASE employees2"); - writer.executeQuery("CREATE DATABASE productsnew"); + writer.executeSystemQuery("CREATE DATABASE employees2"); + writer.executeSystemQuery("CREATE DATABASE productsnew"); Thread.sleep(10000); Injector injector = Guice.createInjector(new AppInjector()); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java index 5c7fd3516..1f61652cd 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/Debezium15KTablesLoadIT.java @@ -2,12 +2,10 @@ import com.altinity.clickhouse.debezium.embedded.AppInjector; import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; +import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -22,10 +20,8 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; -import java.sql.Connection; import java.sql.ResultSet; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -90,14 +86,7 @@ public void testLoadingTablesInSchemaOnlyMode() throws Exception { Thread.sleep(25000); - // Confirm if only the whitelisted tabes were replicated. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet dateTimeResult = writer.executeQueryWithResultSet("select name from system.tables where database='employees'"); boolean insertCheck = false; diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java index ae3b78f16..a53f6cf01 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureIT.java @@ -4,24 +4,16 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLBaseIT; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; import com.altinity.clickhouse.sink.connector.model.DBCredentials; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.common.collect.Maps; import com.google.inject.Guice; import com.google.inject.Injector; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; - -import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; -import static org.junit.Assert.assertTrue; - import org.apache.log4j.BasicConfigurator; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -33,13 +25,16 @@ import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; -import java.sql.*; -import java.util.HashMap; +import java.sql.Connection; +import java.sql.ResultSet; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; +import static org.junit.Assert.assertTrue; + @Testcontainers public class DebeziumChangeEventCaptureIT{ @@ -58,14 +53,7 @@ public void testDeleteOffsetStorageRow2() { JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); DBCredentials dbCredentials = dec.parseDBConfiguration(config); - String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase()); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "Client_1", dbCredentials.getUserName(), - dbCredentials.getPassword(), config); - - BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase(), dbCredentials.getUserName(), - dbCredentials.getPassword(), config, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer); //String offsetKey = new DebeziumOffsetStorage().getOffsetKey(props); @@ -149,14 +137,7 @@ public void testIncrementingSequenceNumbers() throws Exception { Thread.sleep(20000); - // Create connection to ClickHouse and get the version numbers. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); long version1 = 1L; long version2 = 1L; diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java index 1c26bce8f..9c346b006 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCaptureTest.java @@ -6,15 +6,14 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.json.simple.parser.ParseException; -import static org.junit.Assert.assertTrue; - -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; +import static org.junit.Assert.assertTrue; + public class DebeziumChangeEventCaptureTest { @Test diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java index 2d4d53065..2fb740405 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumStorageViewIT.java @@ -2,19 +2,15 @@ import com.altinity.clickhouse.debezium.embedded.AppInjector; import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; -import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.clickhouse.ClickHouseContainer; @@ -86,8 +82,9 @@ public void debeziumStorageView() throws Exception { // Connect to clickhouse and validate that the view was created successfully. String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "altinity_sink_connector"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection chConn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "altinity_sink_connector", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java index ed5d36460..1e72a7d60 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/DestinationDBColumnMissingIT.java @@ -4,11 +4,9 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.log4j.BasicConfigurator; @@ -33,7 +31,6 @@ import java.util.concurrent.Executors; import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties; -import static org.junit.Assert.assertTrue; @Testcontainers @Disabled @@ -107,8 +104,9 @@ public void testColumnMismatch() throws Exception { // Validate in Clickhouse the last record written is 29999 String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection chConn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), + "employees", new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java index 544573869..f0b1051a7 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/MultipleUpdatesWSameTimestampIT.java @@ -4,16 +4,10 @@ import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication; import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi; -import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService; import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService; -import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.altinity.clickhouse.sink.connector.model.DBCredentials; -import com.clickhouse.jdbc.ClickHouseConnection; -import com.google.common.collect.Maps; import com.google.inject.Guice; import com.google.inject.Injector; -import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; import org.apache.log4j.BasicConfigurator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -30,8 +24,6 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -127,13 +119,7 @@ public void testIncrementingSequenceNumberWithUpdates() throws Exception { Thread.sleep(10000); - // Validate in Clickhouse the last record written is 29999 - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); long col2 = 1L; ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from newtable final where col1 = 'a'"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java index 661a37d10..9676f8de6 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/SourceDBColumnMissingIT.java @@ -103,13 +103,7 @@ public void testColumnMismatch() throws Exception { Thread.sleep(10000); - // Validate in Clickhouse the last record written is 29999 - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); long col2 = 1L; ResultSet version1Result = writer.executeQueryWithResultSet("select col2 from newtable final where col1 = 'a'"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java index 4edeaf7e7..fed214959 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java @@ -98,13 +98,7 @@ public void testRestClient() throws Exception { Thread.sleep(10000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java index 93f20f948..16fbd40c5 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableAddColumnIT.java @@ -87,12 +87,8 @@ public void testAddColumn() throws Exception { Thread.sleep(25000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); - + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + Map shipClassColumns = writer.getColumnsDataTypesForTable("ship_class"); Map addTestColumns = writer.getColumnsDataTypesForTable("add_test"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java index 54ecb4e99..244a3f7c0 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableChangeColumnIT.java @@ -72,11 +72,7 @@ public void testChangeColumn() throws Exception { Thread.sleep(10000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); Map shipClassColumns = writer.getColumnsDataTypesForTable("ship_class"); Map addTestColumns = writer.getColumnsDataTypesForTable("add_test"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java index 4215434df..e01639a20 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AlterTableModifyColumnIT.java @@ -68,10 +68,7 @@ public void testModifyColumn() throws Exception { Thread.sleep(15000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); Map shipClassColumns = writer.getColumnsDataTypesForTable("ship_class"); Map addTestColumns = writer.getColumnsDataTypesForTable("add_test"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java index ae22ee63a..8c5a4cb61 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/AutoCreateTableIT.java @@ -90,14 +90,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception Thread.sleep(10000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); - + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); Thread.sleep(10000); ResultSet dateTimeResult = writer.executeQueryWithResultSet("select count(*) from `new-table`"); boolean resultReceived = false; diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java index 899e712bd..7da54d36b 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesIT.java @@ -66,14 +66,7 @@ public void testCreateTable() throws Exception { Thread.sleep(40000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, - chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); Map decimalTable = writer.getColumnsDataTypesForTable("numeric_types_DECIMAL_65_30"); Map dateTimeTable6 = writer.getColumnsDataTypesForTable("temporal_types_DATETIME6"); @@ -112,7 +105,7 @@ public void testCreateTable() throws Exception { //Thread.sleep(10000); //String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection connection = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java index 67b3e01bd..bb6b83cd8 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/CreateTableDataTypesTimeZoneIT.java @@ -79,14 +79,7 @@ public void testCreateTable() throws Exception { Thread.sleep(30000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); Map decimalTable = writer.getColumnsDataTypesForTable("numeric_types_DECIMAL_65_30"); Map dateTimeTable = writer.getColumnsDataTypesForTable("temporal_types_DATETIME6"); @@ -116,8 +109,9 @@ public void testCreateTable() throws Exception { writer.getConnection().close(); Thread.sleep(10000); - Connection conn2 = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn2 = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), + "employees", new ClickHouseSinkConnectorConfig(new HashMap<>())); writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn2); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java index 412c94828..19a1eb86b 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java @@ -3,6 +3,9 @@ import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; + import org.apache.log4j.BasicConfigurator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -14,8 +17,6 @@ import org.testcontainers.utility.DockerImageName; import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.util.Properties; @Testcontainers diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLIgnoreRegExIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLIgnoreRegExIT.java index 18f60f053..2329bee4c 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLIgnoreRegExIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLIgnoreRegExIT.java @@ -91,7 +91,7 @@ public void testDDLIgnoreRegex() throws Exception { String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",clickHouseContainer.getUsername(), + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java index 0ca50e5b7..aa8926742 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneColumnSchemaOnlyIT.java @@ -127,13 +127,7 @@ public void testSchemaOnlyMode() throws Exception { Thread.sleep(10000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java index 499447d3c..8413ff5a5 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneIT.java @@ -80,14 +80,7 @@ public void testCreateTable() throws Exception { Thread.sleep(30000); // Create connection. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); // Validate temporal_types_DATETIME data. ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME"); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java index bf952f1b9..7a2c8c9b8 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java @@ -95,7 +95,7 @@ public void testCreateTable() throws Exception { //conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59',NULL);\n").execute(); String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection conn1 = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn1 = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn1); @@ -103,7 +103,7 @@ public void testCreateTable() throws Exception { writer.getConnection().close(); Thread.sleep(10000); - Connection conn2 = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn2 = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn2); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java index c53385ff2..526e2dfc0 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedDifferentTimeZoneIT.java @@ -107,13 +107,7 @@ public void testCreateTable() throws Exception { String DATETIME6_MID = "2022-09-29 01:50:56.123456"; String DATETIME6_MAX = "2299-12-31 23:59:59.999999"; - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); // Validate that the MySQL server is set to Central timezone. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java index ef9722967..bead30de3 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithUserProvidedTimeZoneSchemaOnlyIT.java @@ -96,7 +96,7 @@ public void testCreateTable() throws Exception { String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + Connection chConn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), @@ -106,7 +106,7 @@ public void testCreateTable() throws Exception { writer.getConnection().close(); Thread.sleep(10000); - Connection chConn2 = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + Connection chConn2 = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java index bb5e46f9c..b88cdba86 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/EmployeesDBIT.java @@ -1,5 +1,6 @@ package com.altinity.clickhouse.debezium.embedded.ddl.parser; +import com.altinity.clickhouse.debezium.embedded.ITCommon; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; @@ -76,10 +77,7 @@ public void testEmployeesDB() throws Exception { Thread.sleep(40000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees"); - Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); // Validate that all the tables are created. diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java index 7081648da..7c5254cc6 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/IsDeletedColumnsIT.java @@ -92,13 +92,7 @@ public void testIsDeleted(String clickHouseServerVersion) throws Exception { conn.close(); Thread.sleep(10000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); ResultSet rs = writer.executeQueryWithResultSet("select * from new_table"); boolean recordFound = false; while(rs.next()) { diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java index fe3b7c699..ba38ded84 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MultipleDatabaseIT.java @@ -119,13 +119,7 @@ public void testMultipleDatabases() throws Exception { conn.close(); // Create connection to clickhouse and validate if the tables are replicated. - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); // query clickhouse connection and get data for test_table1 and test_table2 @@ -162,7 +156,7 @@ public void testMultipleDatabases() throws Exception { // Jdbc url with test_db database. String testDb2JdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "test_db2"); - Connection testDb2Conn = BaseDbWriter.createConnection(testDb2JdbcUrl, "Client_1", + Connection testDb2Conn = BaseDbWriter.createConnection(testDb2JdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter testDb2Writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "test_db2", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, testDb2Conn); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java index b547f5d40..fbcfe339a 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java @@ -6,14 +6,12 @@ import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; -import com.clickhouse.jdbc.ClickHouseConnection; import org.apache.log4j.BasicConfigurator; import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.MySQLContainer; @@ -122,13 +120,7 @@ public void testTableOperations(boolean databaseOverride) throws Exception { "fullname varchar(101) GENERATED ALWAYS AS (CONCAT(first_name,' ',last_name)),\n" + "email VARCHAR(100) NOT NULL);\n").execute(); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); conn.prepareStatement("create table new_table_copy like new_table").execute(); @@ -142,7 +134,7 @@ public void testTableOperations(boolean databaseOverride) throws Exception { // Validate table created with partitions. - String membersResult = writer.executeQuery("show create table members"); + String membersResult = writer.executeSystemQuery("show create table members"); Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" + "(\n" + " `firstname` String,\n" + @@ -158,7 +150,7 @@ public void testTableOperations(boolean databaseOverride) throws Exception { "ORDER BY tuple()\n" + "SETTINGS index_granularity = 8192")); - String rcxResult = writer.executeQuery("show create table rcx"); + String rcxResult = writer.executeSystemQuery("show create table rcx"); Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" + "(\n" + diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java index ba4378e1b..d01d78043 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TruncateTableIT.java @@ -79,13 +79,7 @@ public void testIsDeleted() throws Exception { Thread.sleep(30000); - String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees"); - Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", - clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); - - BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), - "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); //Validate if ship_class was truncated also in ClickHouse. diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java index 1f255f500..2208ec755 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java @@ -2,7 +2,6 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; -import com.clickhouse.jdbc.ClickHouseConnection; import com.clickhouse.jdbc.ClickHouseDataSource; @@ -19,6 +18,8 @@ public class BaseDbWriter { + public static final String DATABASE_CLIENT_NAME = "Sink_Connector"; + public static final String SYSTEM_DB = "system"; protected Connection conn; private String hostName; @@ -89,8 +90,9 @@ public static String getConnectionString(String hostName, Integer port, String d * @param userName UserName * @param password Password */ - public static Connection createConnection(String url, String clientName, String userName, String password, - ClickHouseSinkConnectorConfig config) { + public static Connection createConnection(String url, String clientName, String userName, + String password, String databaseName + , ClickHouseSinkConnectorConfig config) { String jdbcParams = ""; Connection conn = null; @@ -112,7 +114,7 @@ public static Connection createConnection(String url, String clientName, String url = url + "?user=" + userName + "&password=" + password; ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties); // Get connection from the pool. - HikariDbSource hikariDbSource = HikariDbSource.getInstance(dataSource); + HikariDbSource hikariDbSource = HikariDbSource.getInstance(dataSource, databaseName); // Create a new ClickHouseConnection object with the connection from the pool. // Convert Connection to ClickHouseConnection. @@ -133,12 +135,12 @@ public static Connection createConnection(String url, String clientName, String * @return * @throws SQLException */ - public String executeQuery(String sql) throws SQLException { + public String executeSystemQuery(String sql) throws SQLException { String result = null; if(this.conn == null) { String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", userName, password, config); + conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, BaseDbWriter.SYSTEM_DB, config); } ResultSet rs = this.conn.prepareStatement(sql).executeQuery(); if(rs != null) { @@ -172,7 +174,7 @@ public ResultSet executeQueryWithResultSet(String sql) throws SQLException { * @throws SQLException */ public String getClickHouseVersion() throws SQLException { - return this.executeQuery("SELECT VERSION()"); + return this.executeSystemQuery("SELECT VERSION()"); } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java index 921dee1ec..2f8817bb6 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java @@ -7,33 +7,35 @@ import java.sql.Connection; import java.sql.SQLException; -// Singleton class. +// Singleton class(one per database) public class HikariDbSource { private static HikariDbSource instance; private HikariDataSource dataSource; + private String databaseName; // private constructor - private HikariDbSource(ClickHouseDataSource dataSource) { - this.createConnectionPool(dataSource); + private HikariDbSource(ClickHouseDataSource dataSource, String databaseName) { + this.createConnectionPool(dataSource, databaseName); } public Connection getConnection() throws SQLException { return this.dataSource.getConnection(); } - public static HikariDbSource getInstance(ClickHouseDataSource dataSource) { + public static HikariDbSource getInstance(ClickHouseDataSource dataSource, String databaseName) { if (instance == null) { - instance = new HikariDbSource(dataSource); + instance = new HikariDbSource(dataSource, databaseName); } return instance; } - public void createConnectionPool(ClickHouseDataSource dataSource) { + public void createConnectionPool(ClickHouseDataSource dataSource, String databaseName) { // pass the clickhouse config to create the datasource HikariConfig poolConfig = new HikariConfig(); - poolConfig.setJdbcUrl("jdbc:ch:{hostname}:{port}/default?insert_quorum=auto&server_time_zone&server_version=22.13.1.24495"); + String jdbcUrl = String.format("jdbc:ch:{hostname}:{port}/%s?insert_quorum=auto&server_time_zone&server_version=22.13.1.24495", databaseName); + poolConfig.setJdbcUrl(jdbcUrl); poolConfig.setDriverClassName("com.clickhouse.jdbc.ClickHouseDriver"); // Ensure driver is set // poolConfig.setUsername(dataSource.getConnection().getCurrentUser()); // Optional, if already in JDBC URL // poolConfig.setPassword(dataSource.getConnection().()); // Optional, if already in JDBC URL diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 4b6867363..394fbdfb0 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -81,7 +81,7 @@ public ClickHouseBatchRunnable(LinkedBlockingQueue> recor //this.topicToRecordsMap = new HashMap<>(); this.dbCredentials = parseDBConfiguration(); - this.systemConnection = createConnection(); + this.systemConnection = createConnection(BaseDbWriter.SYSTEM_DB); try { @@ -92,12 +92,12 @@ public ClickHouseBatchRunnable(LinkedBlockingQueue> recor } } - private Connection createConnection() { + private Connection createConnection(String databaseName) { String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(), this.dbCredentials.getPort(), "system"); return BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight", this.dbCredentials.getUserName(), - this.dbCredentials.getPassword(), config); + this.dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config); } // Function to check if we have already stored a ClickHouseConnection @@ -110,8 +110,8 @@ private Connection getClickHouseConnection(String databaseName) { String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(), this.dbCredentials.getPort(), databaseName); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight", - this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), config); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), databaseName, config); this.databaseToConnectionMap.put(databaseName, conn); return conn; diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java index 6c7a60d6e..12bc3d362 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java @@ -68,7 +68,7 @@ public ClickHouseBatchWriter( //this.topicToRecordsMap = new HashMap<>(); this.dbCredentials = parseDBConfiguration(); - this.systemConnection = createConnection(); + this.systemConnection = createConnection(BaseDbWriter.SYSTEM_DB); try { @@ -79,12 +79,12 @@ public ClickHouseBatchWriter( } } - private Connection createConnection() { + private Connection createConnection(String database) { String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(), this.dbCredentials.getPort(), "system"); return BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight", this.dbCredentials.getUserName(), - this.dbCredentials.getPassword(), config); + this.dbCredentials.getPassword(), database,config); } // Function to check if we have already stored a ClickHouseConnection @@ -97,8 +97,8 @@ private Connection getClickHouseConnection(String databaseName) { String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(), this.dbCredentials.getPort(), databaseName); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight", - this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), config); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, + this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), databaseName, config); this.databaseToConnectionMap.put(databaseName, conn); return conn; diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java index 2447f171e..05b803aed 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java @@ -63,7 +63,8 @@ public void convert() throws SQLException { String password = clickHouseContainer.getPassword(); String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = BaseDbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); BaseDbWriter dbWriter = new BaseDbWriter(dbHostName, port, database, userName, password, null, conn); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java index b3edbaf88..be649928d 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java @@ -1,6 +1,7 @@ package com.altinity.clickhouse.sink.connector.converters; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; import com.altinity.clickhouse.sink.connector.db.DbWriter; import com.altinity.clickhouse.sink.connector.metadata.DataTypeRange; import com.clickhouse.data.ClickHouseDataType; @@ -252,7 +253,8 @@ public void testBatchArrays() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = DbWriter.getConnectionString(hostName, port, database); - Connection conn1 = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config); + Connection conn1 = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, config); DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn1); String url = dbWriter.getConnectionString(hostName, port, database); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java index eaf25a57f..fbf0b2dbd 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java @@ -23,7 +23,7 @@ public void testSplitJdbcProperties() { ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(props); Connection conn = BaseDbWriter.createConnection( - "localhost", "client_1","default", "", config); + "localhost", BaseDbWriter.DATABASE_CLIENT_NAME,"default", "",BaseDbWriter.SYSTEM_DB, config); Properties properties = new BaseDbWriter( "localhost", 8123, diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java index a7581a33a..031bc8265 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java @@ -73,7 +73,8 @@ public void testCheckIfDatabaseExists() throws SQLException { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); @@ -148,7 +149,8 @@ public void getTestGetServerTimeZone() { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB,new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); ZoneId serverTimeZone = new DBMetadata().getServerTimeZone(writer.getConnection()); @@ -167,7 +169,8 @@ public void getAliasAndMaterializedColumnsList() throws SQLException { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); Set aliasColumns = new DBMetadata().getAliasAndMaterializedColumnsForTableAndDatabase("people", "employees2", conn); Assert.assertTrue(aliasColumns.size() == 2); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java index b7a25770f..0eb1073e9 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java @@ -40,7 +40,8 @@ public void testInsertTopicOffsetMetadata() throws SQLException { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB,new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java index af9d994fd..d3f6cc377 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java @@ -50,8 +50,8 @@ public static void init() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, - config); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, config); writer = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn); } @@ -104,7 +104,8 @@ public void testGetColumnsDataTypesForTable() { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); @@ -115,7 +116,8 @@ public void testGetColumnsDataTypesForTable() { String database2 = "employees2"; String jdbcUrl2 = BaseDbWriter.getConnectionString(dbHostName, port, database2); - Connection conn2 = DbWriter.createConnection(jdbcUrl2, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn2 = DbWriter.createConnection(jdbcUrl2, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer2 = new DbWriter(dbHostName, port, database2, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn2); Map columnDataTypesMap2 = writer2.getColumnsDataTypesForTable("employees"); @@ -136,8 +138,8 @@ public void testGetEngineType() { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, - new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); MutablePair result = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "default", "employees"); @@ -168,8 +170,8 @@ public void testGetEngineTypeUsingSystemTables() { String tableName = "employees"; String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, - new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); MutablePair< DBMetadata.TABLE_ENGINE, String> result = new DBMetadata().getTableEngineUsingSystemTables(writer.getConnection(), @@ -238,7 +240,8 @@ public void testGroupRecords() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); //String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - Connection conn = DbWriter.createConnection(connectionUrl, "client_1", userName, password, config); + Connection conn = DbWriter.createConnection(connectionUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, config); DbWriter dbWriter = new DbWriter(dbHostName, port, database, tableName, userName, password, config, null, conn); Map>, List> queryToRecordsMap = new HashMap<>(); @@ -282,7 +285,8 @@ public void testGetClickHouseDataType() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, config); DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn); PreparedStatementExecutor preparedStatementExecutor = new PreparedStatementExecutor(null, false, null, null, database, ZoneId.of("UTC")); @@ -313,7 +317,8 @@ public void testBatchArrays() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - Connection conn2 = DbWriter.createConnection(jdbcUrl, "client_1", userName, "", config); + Connection conn2 = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, "", + BaseDbWriter.SYSTEM_DB, config); DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, "", config, null, conn2); String url = dbWriter.getConnectionString(hostName, port, database); @@ -378,7 +383,8 @@ public void testBatchInsert() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - Connection conn2 = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config); + Connection conn2 = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, config); DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn2); String url = dbWriter.getConnectionString(hostName, port, database); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java index ab5024cef..ba6141a81 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java @@ -56,7 +56,8 @@ static void initialize() { String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, config); DbWriter writer = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn); conn = writer.getConnection(); @@ -171,7 +172,8 @@ public void testCreateNewTable() { String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>())); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>())); DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java index 3cd49b62d..7c55dc4b2 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java @@ -44,7 +44,8 @@ static void initialize() { ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>()); String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, systemDb); - Connection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config); + Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, + DbWriter.SYSTEM_DB, config); dbWriter = new DbWriter(hostName, port, dbName, null, userName, password, config, null, conn); maintenanceDbWriter = new DbWriter(hostName, port, systemDb, null, userName, password, config, null, conn); }