Skip to content

Commit

Permalink
Merge pull request #759 from Altinity/733-postgresql-truncate-breaks-…
Browse files Browse the repository at this point in the history
…lightweight-sink-connector

Added Integration test to validate truncate event replication in post…gresql
  • Loading branch information
subkanthi authored Aug 23, 2024
2 parents f9c2909 + 2d30bc4 commit b7c2955
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,14 @@ static public Connection connectToMySQL(MySQLContainer mySqlContainer) {
}

// Function to connect to Postgres.
static public Connection connectToPostgreSQL(PostgreSQLContainer postgreSQLContainer) {
static public Connection connectToPostgreSQL(PostgreSQLContainer postgreSQLContainer) throws SQLException {
Connection conn = null;
try {

String connectionUrl = String.format("jdbc:postgresql://%s:%s/%s?user=%s&password=%s", postgreSQLContainer.getHost(),
postgreSQLContainer.getFirstMappedPort(),
postgreSQLContainer.getDatabaseName(), postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword());
conn = DriverManager.getConnection(connectionUrl);

} catch (SQLException ex) {

}
return conn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void startContainers() throws InterruptedException {
public Properties getProperties() throws Exception {

Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer);
properties.put("plugin.name", "decoderbufs");
properties.put("plugin.name", "pgoutput");
properties.put("plugin.path", "/");
properties.put("table.include.list", "public.tm");
properties.put("slot.max.retries", "6");
Expand All @@ -83,7 +83,8 @@ public Properties getProperties() throws Exception {
properties.put("table.include.list", "public.tm,public.tm2,public.redata");
properties.put("offset.storage.jdbc.offset.table.ddl", "CREATE TABLE if not exists %s on cluster '{cluster}' (id String, offset_key String, offset_val String, record_insert_ts DateTime, record_insert_seq UInt64) ENGINE = KeeperMap('/asc_offsets201',10) PRIMARY KEY offset_key");
properties.put("offset.storage.jdbc.offset.table.delete", "select 1");

properties.put("skipped.operations","none");
properties.put("disable.drop.truncate", "false");
return properties;
}

Expand Down Expand Up @@ -151,6 +152,18 @@ public void testDecoderBufsPlugin() throws Exception {
Assert.assertTrue(offsetValue.contains("ts_usec"));
Assert.assertTrue(offsetValue.contains("snapshot"));

// Connect to postgreSQL and issue a truncate table command.
ITCommon.connectToPostgreSQL(postgreSQLContainer).prepareStatement("truncate table public.tm").execute();
Thread.sleep(15000);

// Check if the clickhouse table is empty.
chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
while(chRs.next()) {
tmCount = chRs.getInt(1);
}

//Assert.assertTrue(tmCount == 0);

if(engine.get() != null) {
engine.get().stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public boolean groupQueryWithRecords(List<ClickHouseStruct> records,
Iterator iterator = records.iterator();
while (iterator.hasNext()) {
ClickHouseStruct record = (ClickHouseStruct) iterator.next();
updatePartitionOffsetMap(partitionToOffsetMap, record.getKafkaPartition(), record.getTopic(), record.getKafkaOffset());
if(record != null && record.getKafkaPartition() != null && record.getTopic() != null ) {
updatePartitionOffsetMap(partitionToOffsetMap, record.getKafkaPartition(), record.getTopic(), record.getKafkaOffset());
}
boolean enableSchemaEvolution = config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_SCHEMA_EVOLUTION.toString());

if(CdcRecordState.CDC_RECORD_STATE_BEFORE == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,23 @@ public static List<ClickHouseStruct> getSampleRecords() {

@Test
public void testGroupRecords() {
String hostName = "remoteClickHouse";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
String database = "default";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";

String connectionUrl = writer.getConnectionString(hostName, port, database);

String connectionUrl = writer.getConnectionString(dbHostName, port, database);
Properties properties = new Properties();
properties.setProperty("client_name", "Test_1");

ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());

String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config);
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn);
//String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
ClickHouseConnection conn = DbWriter.createConnection(connectionUrl, "client_1", userName, password, config);
DbWriter dbWriter = new DbWriter(dbHostName, port, database, tableName, userName, password, config, null, conn);

Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap = new HashMap<>();

Expand Down

0 comments on commit b7c2955

Please sign in to comment.