Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaV2SinkConnector #38973

Merged
merged 15 commits into from
Mar 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ the main ServiceBusClientBuilder. -->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="[/\\]azure-cosmos-kafka-connect[/\\]"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSourceConnector"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSinkConnector"/>

<!-- Checkstyle suppressions for resource manager package -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.resourcemanager.*"/>
Expand Down
1 change: 1 addition & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
## Cosmos Kafka connector under sdk\cosmos\azure-cosmos-kafka-connect\pom.xml
# Cosmos Kafka connector runtime dependencies
cosmos_org.apache.kafka:connect-api;3.6.0
cosmos_com.jayway.jsonpath:json-path;2.9.0
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
# Cosmos Kafka connector tests only
cosmos_org.apache.kafka:connect-runtime;3.6.0
cosmos_org.testcontainers:testcontainers;1.19.5
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added Source connector. See [PR 38748](https://github.com/Azure/azure-sdk-for-java/pull/38748)
* Added Sink connector. See [PR 38973](https://github.com/Azure/azure-sdk-for-java/pull/38973)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,16 @@
| `kafka.connect.cosmos.source.metadata.storage.topic` | `_cosmos.metadata.topic` | The name of the topic where the metadata are stored. The metadata topic will be created if it does not already exist, else it will use the pre-created topic. |
| `kafka.connect.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. |
| `kafka.connect.cosmos.source.messageKey.field` | `id` | The field to use as the message key. |

## Sink Connector Configuration
| Config Property Name | Default | Description |
|:---------------------------------------------------------------|:--------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.connect.cosmos.sink.database.name` | None | Cosmos DB database name. |
| `kafka.connect.cosmos.sink.containers.topicMap` | None | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. |
| `kafka.connect.cosmos.sink.errors.tolerance` | `None` | Error tolerance level after exhausting all retries. `None` for fail on error. `All` for log and continue |
| `kafka.connect.cosmos.sink.bulk.enabled` | `true` | Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector. By default it is true. |
| `kafka.connect.cosmos.sink.bulk.maxConcurrentCosmosPartitions` | `-1` | Cosmos DB Item Write Max Concurrent Cosmos Partitions. If not specified it will be determined based on the number of the container's physical partitions which would indicate every batch is expected to have data from all Cosmos physical partitions. If specified it indicates from at most how many Cosmos Physical Partitions each batch contains data. So this config can be used to make bulk processing more efficient when input data in each batch has been repartitioned to balance to how many Cosmos partitions each batch needs to write. This is mainly useful for very large containers (with hundreds of physical partitions. |
| `kafka.connect.cosmos.sink.bulk.initialBatchSize` | `1` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 1. Reduce this when you want to avoid that the first few requests consume too many RUs. |
| `kafka.connect.cosmos.sink.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts), `ItemDelete` (deletes based on id/pk of data frame), `ItemDeleteIfNotModified` (deletes based on id/pk of data frame if etag hasn't changed since collecting id/pk), `ItemOverwriteIfNotModified` (using create if etag is empty, update/replace with etag pre-condition otherwise, if document was updated the pre-condition failure is ignored) |
| `kafka.connect.cosmos.sink.maxRetryCount` | `10` | Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times. |
| `kafka.connect.cosmos.sink.id.strategy` | `ProvidedInValueStrategy` | A strategy used to populate the document with an ``id``. Valid strategies are: ``TemplateStrategy``, ``FullKeyStrategy``, ``KafkaMetadataStrategy``, ``ProvidedInKeyStrategy``, ``ProvidedInValueStrategy``. Configuration properties prefixed with``id.strategy`` are passed through to the strategy. For example, when using ``id.strategy=TemplateStrategy`` , the property ``id.strategy.template`` is passed through to the template strategy and used to specify the template string to be used in constructing the ``id``. |
25 changes: 24 additions & 1 deletion sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink.idStrategy=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
</javaModulesSurefireArgLine>
</properties>

Expand Down Expand Up @@ -80,6 +86,13 @@ Licensed under the MIT License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos-test</artifactId>
<version>1.0.0-beta.7</version> <!-- {x-version-update;com.azure:azure-cosmos-test;current} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand All @@ -93,6 +106,11 @@ Licensed under the MIT License.
<scope>test</scope>
<version>1.10.0</version> <!-- {x-version-update;org.apache.commons:commons-text;external_dependency} -->
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>json-path</artifactId>
<version>2.9.0</version> <!-- {x-version-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -235,6 +253,7 @@ Licensed under the MIT License.
<include>com.azure:*</include>
<include>org.apache.kafka:connect-api:[3.6.0]</include> <!-- {x-include-update;cosmos_org.apache.kafka:connect-api;external_dependency} -->
<include>io.confluent:kafka-connect-maven-plugin:[0.12.0]</include> <!-- {x-include-update;cosmos_io.confluent:kafka-connect-maven-plugin;external_dependency} -->
<include>com.jayway.jsonpath:json-path:[2.9.0]</include> <!-- {x-include-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
<include>org.sourcelab:kafka-connect-client:[4.0.4]</include> <!-- {x-include-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
</includes>
</bannedDependencies>
Expand Down Expand Up @@ -319,6 +338,10 @@ Licensed under the MIT License.
<pattern>reactor</pattern>
<shadedPattern>${shadingPrefix}.reactor</shadedPattern>
</relocation>
<relocation>
<pattern>com.jayway.jsonpath</pattern>
<shadedPattern>${shadingPrefix}.com.jayway.jsonpath</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<excludes>
Expand Down Expand Up @@ -456,7 +479,7 @@ Licensed under the MIT License.
</profile>
<profile>
<!-- integration tests, requires Cosmos DB Emulator Endpoint -->
<id>kafka-integration</id>
<id>kafka</id>
<properties>
<test.groups>kafka</test.groups>
</properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* A Sink connector that publishes topic messages to CosmosDB.
*/
public class CosmosDBSinkConnector extends SinkConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosDBSinkConnector.class);

private CosmosSinkConfig sinkConfig;

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos sink connector");
this.sinkConfig = new CosmosSinkConfig(props);
}

@Override
public Class<? extends Task> taskClass() {
return CosmosSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
LOGGER.info("Setting task configurations with maxTasks {}", maxTasks);
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
configs.add(this.sinkConfig.originalsStrings());
}

return configs;
}

@Override
public void stop() {
LOGGER.debug("Kafka Cosmos sink connector {} is stopped.");
}

@Override
public ConfigDef config() {
return CosmosSinkConfig.getConfigDef();
}

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
Expand Down Expand Up @@ -102,8 +102,8 @@ public ConfigDef config() {

@Override
public String version() {
return CosmosConstants.CURRENT_VERSION;
} // TODO[public preview]: how this is being used
return KafkaCosmosConstants.CURRENT_VERSION;
}

private List<Map<String, String>> getTaskConfigs(int maxTasks) {
Pair<MetadataTaskUnit, List<FeedRangeTaskUnit>> taskUnits = this.getAllTaskUnits();
Expand Down Expand Up @@ -314,7 +314,7 @@ private List<Range<String>> getFeedRanges(CosmosContainerProperties containerPro
.getContainer(containerProperties.getId())
.getFeedRanges()
.onErrorMap(throwable ->
CosmosExceptionsHelper.convertToConnectException(
KafkaCosmosExceptionsHelper.convertToConnectException(
throwable,
"GetFeedRanges failed for container " + containerProperties.getId()))
.block()
Expand All @@ -324,15 +324,7 @@ private List<Range<String>> getFeedRanges(CosmosContainerProperties containerPro
}

private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties> allContainers) {
Map<String, String> topicMapFromConfig =
this.config.getContainersConfig().getContainersTopicMap()
.stream()
.map(containerTopicMapString -> containerTopicMapString.split("#"))
.collect(
Collectors.toMap(
containerTopicMapArray -> containerTopicMapArray[1],
containerTopicMapArray -> containerTopicMapArray[0]));

Map<String, String> topicMapFromConfig = this.config.getContainersConfig().getContainerToTopicMap();
Map<String, String> effectiveContainersTopicMap = new HashMap<>();
allContainers.forEach(containerProperties -> {
// by default, we are using container id as the topic name as well unless customer override through containers.topicMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi

private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
}

return CosmosConstants.USER_AGENT_SUFFIX;
return KafkaCosmosConstants.USER_AGENT_SUFFIX;
}
}
Loading
Loading