Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaV2SinkConnector #38973

Merged
merged 15 commits into from
Mar 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ the main ServiceBusClientBuilder. -->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="[/\\]azure-cosmos-kafka-connect[/\\]"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSourceConnector"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSinkConnector"/>

<!-- Checkstyle suppressions for resource manager package -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.resourcemanager.*"/>
Expand Down
1 change: 1 addition & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
## Cosmos Kafka connector under sdk\cosmos\azure-cosmos-kafka-connect\pom.xml
# Cosmos Kafka connector runtime dependencies
cosmos_org.apache.kafka:connect-api;3.6.0
cosmos_com.jayway.jsonpath:json-path;2.9.0
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
# Cosmos Kafka connector tests only
cosmos_org.apache.kafka:connect-runtime;3.6.0
cosmos_org.testcontainers:testcontainers;1.19.5
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added Source connector. See [PR 38748](https://github.com/Azure/azure-sdk-for-java/pull/38748)
* Added Sink connector. See [PR 38973](https://github.com/Azure/azure-sdk-for-java/pull/38973)

#### Breaking Changes

Expand Down

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
</javaModulesSurefireArgLine>
</properties>

Expand Down Expand Up @@ -80,6 +84,13 @@ Licensed under the MIT License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos-test</artifactId>
<version>1.0.0-beta.7</version> <!-- {x-version-update;com.azure:azure-cosmos-test;current} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand All @@ -93,6 +104,11 @@ Licensed under the MIT License.
<scope>test</scope>
<version>1.10.0</version> <!-- {x-version-update;org.apache.commons:commons-text;external_dependency} -->
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>json-path</artifactId>
<version>2.9.0</version> <!-- {x-version-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -235,6 +251,7 @@ Licensed under the MIT License.
<include>com.azure:*</include>
<include>org.apache.kafka:connect-api:[3.6.0]</include> <!-- {x-include-update;cosmos_org.apache.kafka:connect-api;external_dependency} -->
<include>io.confluent:kafka-connect-maven-plugin:[0.12.0]</include> <!-- {x-include-update;cosmos_io.confluent:kafka-connect-maven-plugin;external_dependency} -->
<include>com.jayway.jsonpath:json-path:[2.9.0]</include> <!-- {x-include-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
<include>org.sourcelab:kafka-connect-client:[4.0.4]</include> <!-- {x-include-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
</includes>
</bannedDependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* A Sink connector that publishes topic messages to CosmosDB.
*/
public class CosmosDBSinkConnector extends SinkConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosDBSinkConnector.class);

private CosmosSinkConfig sinkConfig;

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos sink connector");
this.sinkConfig = new CosmosSinkConfig(props);
}

@Override
public Class<? extends Task> taskClass() {
return CosmosSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
LOGGER.info("Setting task configurations with maxTasks {}", maxTasks);
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
configs.add(this.sinkConfig.originalsStrings());
}

return configs;
}

@Override
public void stop() {
LOGGER.debug("Kafka Cosmos sink connector {} is stopped.");
}

@Override
public ConfigDef config() {
return CosmosSinkConfig.getConfigDef();
}

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
Expand Down Expand Up @@ -102,8 +102,8 @@ public ConfigDef config() {

@Override
public String version() {
return CosmosConstants.CURRENT_VERSION;
} // TODO[public preview]: how this is being used
return KafkaCosmosConstants.CURRENT_VERSION;
}

private List<Map<String, String>> getTaskConfigs(int maxTasks) {
Pair<MetadataTaskUnit, List<FeedRangeTaskUnit>> taskUnits = this.getAllTaskUnits();
Expand Down Expand Up @@ -314,7 +314,7 @@ private List<Range<String>> getFeedRanges(CosmosContainerProperties containerPro
.getContainer(containerProperties.getId())
.getFeedRanges()
.onErrorMap(throwable ->
CosmosExceptionsHelper.convertToConnectException(
KafkaCosmosExceptionsHelper.convertToConnectException(
throwable,
"GetFeedRanges failed for container " + containerProperties.getId()))
.block()
Expand All @@ -324,15 +324,7 @@ private List<Range<String>> getFeedRanges(CosmosContainerProperties containerPro
}

private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties> allContainers) {
Map<String, String> topicMapFromConfig =
this.config.getContainersConfig().getContainersTopicMap()
.stream()
.map(containerTopicMapString -> containerTopicMapString.split("#"))
.collect(
Collectors.toMap(
containerTopicMapArray -> containerTopicMapArray[1],
containerTopicMapArray -> containerTopicMapArray[0]));

Map<String, String> topicMapFromConfig = this.config.getContainersConfig().getContainerToTopicMap();
Map<String, String> effectiveContainersTopicMap = new HashMap<>();
allContainers.forEach(containerProperties -> {
// by default, we are using container id as the topic name as well unless customer override through containers.topicMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi

private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
}

return CosmosConstants.USER_AGENT_SUFFIX;
return KafkaCosmosConstants.USER_AGENT_SUFFIX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceContainersConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -20,7 +21,7 @@
/**
* Common Configuration for Cosmos DB Kafka source connector and sink connector.
*/
public class CosmosConfig extends AbstractConfig {
public class KafkaCosmosConfig extends AbstractConfig {
protected static final ConfigDef.Validator NON_EMPTY_STRING = new ConfigDef.NonEmptyString();
private static final String CONFIG_PREFIX = "kafka.connect.cosmos.";

Expand Down Expand Up @@ -51,7 +52,7 @@ public class CosmosConfig extends AbstractConfig {

private final CosmosAccountConfig accountConfig;

public CosmosConfig(ConfigDef config, Map<String, ?> parsedConfig) {
public KafkaCosmosConfig(ConfigDef config, Map<String, ?> parsedConfig) {
super(config, parsedConfig);
this.accountConfig = this.parseAccountConfig();
}
Expand Down Expand Up @@ -151,6 +152,18 @@ public CosmosAccountConfig getAccountConfig() {
return accountConfig;
}

protected static List<String> convertToList(String configValue) {
if (StringUtils.isNotEmpty(configValue)) {
if (configValue.startsWith("[") && configValue.endsWith("]")) {
configValue = configValue.substring(1, configValue.length() - 1);
}

return Arrays.stream(configValue.split(",")).map(String::trim).collect(Collectors.toList());
}

return new ArrayList<>();
}

public static class AccountEndpointValidator implements ConfigDef.Validator {
@Override
@SuppressWarnings("unchecked")
Expand All @@ -173,15 +186,39 @@ public String toString() {
}
}

protected static List<String> convertToList(String configValue) {
if (StringUtils.isNotEmpty(configValue)) {
if (configValue.startsWith("[") && configValue.endsWith("]")) {
configValue = configValue.substring(1, configValue.length() - 1);
public static class ContainersTopicMapValidator implements ConfigDef.Validator {
private static final String INVALID_TOPIC_MAP_FORMAT =
"Invalid entry for topic-container map. The topic-container map should be a comma-delimited "
+ "list of Kafka topic to Cosmos containers. Each mapping should be a pair of Kafka "
+ "topic and Cosmos container separated by '#'. For example: topic1#con1,topic2#con2.";

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object o) {
String configValue = (String) o;
if (StringUtils.isEmpty(configValue)) {
return;
}

return Arrays.stream(configValue.split(",")).map(String::trim).collect(Collectors.toList());
List<String> containerTopicMapList = convertToList(configValue);

// validate each item should be in topic#container format
boolean invalidFormatExists =
containerTopicMapList
.stream()
.anyMatch(containerTopicMap ->
containerTopicMap
.split(CosmosSourceContainersConfig.CONTAINER_TOPIC_MAP_SEPARATOR)
.length != 2);

if (invalidFormatExists) {
throw new ConfigException(name, o, INVALID_TOPIC_MAP_FORMAT);
}
}

return new ArrayList<>();
@Override
public String toString() {
return "Containers topic map";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.azure.core.util.CoreUtils;

public class CosmosConstants {
public class KafkaCosmosConstants {
public static final String PROPERTIES_FILE_NAME = "azure-cosmos-kafka-connect.properties";
public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version");
public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;

public class CosmosExceptionsHelper {
public class KafkaCosmosExceptionsHelper {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
public static boolean isTransientFailure(int statusCode, int substatusCode) {
return statusCode == HttpConstants.StatusCodes.GONE
|| statusCode == HttpConstants.StatusCodes.SERVICE_UNAVAILABLE
Expand Down Expand Up @@ -43,10 +43,42 @@ public static boolean isFeedRangeGoneException(int statusCode, int substatusCode
}

public static ConnectException convertToConnectException(Throwable throwable, String message) {
if (CosmosExceptionsHelper.isTransientFailure(throwable)) {
if (KafkaCosmosExceptionsHelper.isTransientFailure(throwable)) {
return new RetriableException(message, throwable);
}

return new ConnectException(message, throwable);
}

public static boolean isResourceExistsException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.CONFLICT;
}

return false;
}

public static boolean isNotFoundException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.NOTFOUND;
}

return false;
}

public static boolean isPreconditionFailedException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.PRECONDITION_FAILED;
}

return false;
}

public static boolean isTimeoutException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.REQUEST_TIMEOUT;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation;

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class KafkaCosmosSchedulers {
private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "sink-bounded-elastic";
private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
public static final Scheduler SINK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
SINK_BOUNDED_ELASTIC_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation.sink;

import org.apache.kafka.connect.errors.ConnectException;

/**
* Generic CosmosDb sink write exceptions.
*/
public class CosmosDBWriteException extends ConnectException {
/**
*
*/
private static final long serialVersionUID = 1L;

public CosmosDBWriteException(String message) {
super(message);
}
}
Loading
Loading