Skip to content

Commit

Permalink
Added unit test to test get connector type call.
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Jan 28, 2025
1 parent 6fd6fb7 commit 2e4efae
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 9 deletions.
12 changes: 11 additions & 1 deletion sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,17 @@
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public static String getProperty(final Map<String, String> config, final String
*/
static ConfigDef newConfigDef() {
return new ConfigDef()
.define(
ClickHouseSinkConnectorConfigVariables.CONNECTOR_CLASS.toString(),
Type.STRING,
"",
null,
Importance.HIGH,
"Connector class"
)
// Config Group "Connector config"
.define(
ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_TOPICS_TABLES_MAP.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
REPLICA_STATUS_VIEW("replica.status.view"),
MAX_QUEUE_SIZE("sink.connector.max.queue.size"),

SINGLE_THREADED("single.threaded");
SINGLE_THREADED("single.threaded"),

CONNECTOR_CLASS("connector.class");
private String label;

ClickHouseSinkConnectorConfigVariables(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@ public String getValue() {
}

public static ConnectorType fromString(String value) {
ConnectorType connectorType = ConnectorType.MYSQL;

ConnectorDescriptor.getDisplayNameForConnectorClass(value);
return ConnectorType.valueOf(value);
String displayName = ConnectorDescriptor.getIdForConnectorClass(value);
if(displayName != null) {
connectorType =ConnectorType.valueOf(displayName);
if(displayName.contains(MYSQL.getValue())) {
connectorType = ConnectorType.MYSQL;
} else if(displayName.contains(POSTGRES.getValue())) {
connectorType = ConnectorType.POSTGRES;
}
}
return connectorType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.altinity.clickhouse.sink.connector.model.DBCredentials;
import com.clickhouse.jdbc.ClickHouseConnection;
import io.debezium.embedded.EmbeddedEngineConfig;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -130,6 +131,16 @@ private DBCredentials parseDBConfiguration() {
return dbCredentials;
}

ConnectorType getConnectorType() {
ConnectorType connectorType = ConnectorType.MYSQL;

try {
connectorType = ConnectorType.fromString(config.getString(EmbeddedEngineConfig.CONNECTOR_CLASS.toString()));
} catch (Exception e) {
log.error("Error while getting connector type", e);
}
return connectorType;
}
/**
* Main run loop of the thread
* which is called based on the schedule
Expand All @@ -138,7 +149,7 @@ private DBCredentials parseDBConfiguration() {
@Override
public void run() {

ConnectorType connectorType = ConnectorType.fromString(config.getString(CONNECTOR_CLASS));
ConnectorType connectorType = getConnectorType();
Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
try {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.altinity.clickhouse.sink.connector.common;

import com.altinity.clickhouse.sink.connector.common.SnowFlakeId;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.altinity.clickhouse.sink.connector.executor;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

import org.junit.Assert;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

Expand All @@ -24,7 +25,7 @@ public class ClickHouseBatchRunnableTest {
LinkedBlockingQueue<List<ClickHouseStruct>> records = new LinkedBlockingQueue<>();
Map<String, String> topic2TableMap = new HashMap<>();

@Before
@BeforeEach
public void initTest() {


Expand Down Expand Up @@ -82,6 +83,20 @@ public Struct getKafkaStruct() {
return kafkaConnectStruct;
}

@Test
public void testGetConnectorType() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put("connector.class", "io.debezium.connector.mysql.MySqlConnector");

ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(configMap);
ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);

ConnectorType connectorType = run.getConnectorType();
Assert.assertTrue(connectorType == ConnectorType.MYSQL);
}



@Test
public void testGetTableNameFromTopic() {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap<String, String>());
Expand Down

0 comments on commit 2e4efae

Please sign in to comment.