Skip to content

Commit

Permalink
MiscellaneousChanges (#39827)
Browse files Browse the repository at this point in the history
* Setting CI and integration tests
---------

Co-authored-by: annie-mac <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and annie-mac authored Apr 23, 2024
1 parent 3c12cc4 commit b4588d0
Show file tree
Hide file tree
Showing 28 changed files with 570 additions and 122 deletions.
14 changes: 14 additions & 0 deletions eng/pipelines/templates/stages/cosmos-emulator-matrix.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@
"DESIRED_CONSISTENCIES": "[\"Session\"]",
"JavaTestVersion": "1.11",
"AdditionalArgs": "-DargLine=\"-DACCOUNT_HOST=https://localhost:8081/ -Dhadoop.home.dir=D:/Hadoop\""
},
"Kafka Integration Tests targeting Cosmos Emulator - Java 11": {
"ProfileFlag": "-Pkafka-emulator",
"PROTOCOLS": "[\"Tcp\"]",
"DESIRED_CONSISTENCIES": "[\"Session\"]",
"JavaTestVersion": "1.11",
"AdditionalArgs": "-DargLine=\"-DACCOUNT_HOST=https://localhost:8081/\""
},
"Kafka Integration Tests targeting Cosmos Emulator - Java 17": {
"ProfileFlag": "-Pkafka-emulator",
"PROTOCOLS": "[\"Tcp\"]",
"DESIRED_CONSISTENCIES": "[\"Session\"]",
"JavaTestVersion": "1.17",
"AdditionalArgs": "-DargLine=\"-DACCOUNT_HOST=https://localhost:8081/\""
}
}
}
Expand Down
57 changes: 43 additions & 14 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,6 @@ Licensed under the MIT License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>3.6.0</version> <!-- {x-version-update;cosmos_org.apache.kafka:connect-runtime;external_dependency} -->
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down Expand Up @@ -528,7 +515,28 @@ Licensed under the MIT License.
</build>
</profile>
<profile>
<!-- integration tests, requires Cosmos DB Emulator Endpoint -->
<!-- integration tests with cosmos emulator endpoint -->
<id>kafka-emulator</id>
<properties>
<test.groups>kafka-emulator</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.2.5</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/kafka-emulator-testng.xml</suiteXmlFile>
</suiteXmlFiles>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<!-- integration tests with cosmos prod endpoint -->
<id>kafka</id>
<properties>
<test.groups>kafka</test.groups>
Expand All @@ -548,5 +556,26 @@ Licensed under the MIT License.
</plugins>
</build>
</profile>
<profile>
<!-- integration tests with test container, also requires cosmosdb prod endpoints -->
<id>kafka-integration</id>
<properties>
<test.groups>kafka-integration</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.2.5</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/kafka-integration-testng.xml</suiteXmlFile>
</suiteXmlFiles>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosMetadataStorageType;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.azure.cosmos.kafka.connect.implementation.source.IMetadataReader;
import com.azure.cosmos.kafka.connect.implementation.source.MetadataKafkaStorageManager;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTaskConfig;
import com.azure.cosmos.kafka.connect.implementation.source.FeedRangeContinuationTopicOffset;
import com.azure.cosmos.kafka.connect.implementation.source.FeedRangeTaskUnit;
import com.azure.cosmos.kafka.connect.implementation.source.FeedRangesMetadataTopicOffset;
import com.azure.cosmos.kafka.connect.implementation.source.IMetadataReader;
import com.azure.cosmos.kafka.connect.implementation.source.KafkaCosmosChangeFeedState;
import com.azure.cosmos.kafka.connect.implementation.source.MetadataCosmosStorageManager;
import com.azure.cosmos.kafka.connect.implementation.source.MetadataKafkaStorageManager;
import com.azure.cosmos.kafka.connect.implementation.source.MetadataMonitorThread;
import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit;
import com.azure.cosmos.models.CosmosContainerProperties;
Expand Down Expand Up @@ -117,7 +117,6 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
throw new IllegalArgumentException("StorageType " + taskUnits.getLeft().getStorageType() + " is not supported");
}


return taskConfigs;
}

Expand Down Expand Up @@ -155,7 +154,6 @@ private IMetadataReader getMetadataReader() {
.getDatabase(this.config.getContainersConfig().getDatabaseName())
.getContainer(this.config.getMetadataConfig().getStorageName());
// validate the metadata container config
// TODO (xinlian-publicPreview): should create the metadata container during runtime? ]
metadataContainer.read()
.doOnNext(containerResponse -> {
PartitionKeyDefinition partitionKeyDefinition = containerResponse.getProperties().getPartitionKeyDefinition();
Expand Down Expand Up @@ -319,7 +317,7 @@ private Mono<Map<FeedRange, KafkaCosmosChangeFeedState>> getEffectiveContinuatio
}

// we can not find the continuation offset based on the exact feed range matching
// it means the previous Partition key range could have gone due to container split/merge
// it means the previous Partition key range could have gone due to container split/merge or there is no continuation state yet
// need to find out overlapped feedRanges from offset
return Flux.fromIterable(rangesFromMetadataTopicOffset)
.flatMap(rangeFromOffset -> {
Expand All @@ -338,23 +336,43 @@ private Mono<Map<FeedRange, KafkaCosmosChangeFeedState>> getEffectiveContinuatio
.collectList()
.flatMap(overlappedFeedRangesFromOffset -> {
if (overlappedFeedRangesFromOffset.size() == 1) {
// split - use the current containerFeedRange, but construct the continuationState based on the feedRange from offset
effectiveContinuationMap.put(
containerFeedRange,
this.getContinuationStateFromOffset(
this.kafkaOffsetStorageReader.getFeedRangeContinuationOffset(databaseName, containerRid, overlappedFeedRangesFromOffset.get(0)),
containerFeedRange));
// a. split - use the current containerFeedRange, but construct the continuationState based on the feedRange from offset
// b. there is no existing feed range continuationToken state yet
FeedRangeContinuationTopicOffset continuationTopicOffset = this.kafkaOffsetStorageReader.getFeedRangeContinuationOffset(
databaseName,
containerRid,
overlappedFeedRangesFromOffset.get(0)
);

if (continuationTopicOffset == null) {
effectiveContinuationMap.put(overlappedFeedRangesFromOffset.get(0), null);
} else {
effectiveContinuationMap.put(
containerFeedRange,
this.getContinuationStateFromOffset(continuationTopicOffset, containerFeedRange));
}

return Mono.just(effectiveContinuationMap);
}

if (overlappedFeedRangesFromOffset.size() > 1) {
// merge - use the feed ranges from the offset
for (FeedRange overlappedRangeFromOffset : overlappedFeedRangesFromOffset) {
effectiveContinuationMap.put(
overlappedRangeFromOffset,
this.getContinuationStateFromOffset(
this.kafkaOffsetStorageReader.getFeedRangeContinuationOffset(databaseName, containerRid, overlappedRangeFromOffset),
overlappedRangeFromOffset));
FeedRangeContinuationTopicOffset continuationTopicOffset =
this.kafkaOffsetStorageReader
.getFeedRangeContinuationOffset(
databaseName,
containerRid,
overlappedRangeFromOffset);
if (continuationTopicOffset == null) {
effectiveContinuationMap.put(overlappedRangeFromOffset, null);
} else {
effectiveContinuationMap.put(
overlappedRangeFromOffset,
this.getContinuationStateFromOffset(
this.kafkaOffsetStorageReader.getFeedRangeContinuationOffset(databaseName, containerRid, overlappedRangeFromOffset),
overlappedRangeFromOffset));
}
}

return Mono.just(effectiveContinuationMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import java.util.Map;

public class CosmosClientStore {
// TODO[Public Preview]: revalidate how to get the active directory endpoint map. It suppose to come from management SDK.
private static final Map<CosmosAzureEnvironment, String> ACTIVE_DIRECTORY_ENDPOINT_MAP;
static {
// for now we maintain a static list within the SDK these values do not change very frequently
ACTIVE_DIRECTORY_ENDPOINT_MAP = new HashMap<>();
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE, "https://login.microsoftonline.com/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE_CHINA, "https://login.chinacloudapi.cn/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class CosmosSinkConfig extends KafkaCosmosConfig {
private static final String BULK_ENABLED_DISPLAY = "enable bulk mode.";
private static final boolean DEFAULT_BULK_ENABLED = true;

// TODO[Public Preview]: Add other write config, for example patch, bulkUpdate
public static final String BULK_MAX_CONCURRENT_PARTITIONS = SINK_CONFIG_PREFIX + "bulk.maxConcurrentCosmosPartitions";
private static final String BULK_MAX_CONCURRENT_PARTITIONS_DOC =
"Cosmos DB Item Write Max Concurrent Cosmos Partitions."
Expand Down Expand Up @@ -109,7 +108,6 @@ public class CosmosSinkConfig extends KafkaCosmosConfig {
"A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2.";
private static final String CONTAINERS_TOPIC_MAP_DISPLAY = "Topic-Container map";

// TODO[Public preview]: re-examine idStrategy implementation
// id.strategy
public static final String ID_STRATEGY_CONF = SINK_CONFIG_PREFIX + "id.strategy";
public static final String ID_STRATEGY_DOC =
Expand All @@ -129,8 +127,6 @@ public class CosmosSinkConfig extends KafkaCosmosConfig {
// [.]op[(](.*)[)]: patch operation mapping
public static final Pattern PATCH_PROPERTY_CONFIG_PATTERN = Pattern.compile("(?i)property[(](.*?)[)]([.]path[(](.*)[)])*[.]op[(](.*)[)]$");

// TODO[Public Preview] Verify whether compression need to happen in connector

private final CosmosSinkWriteConfig writeConfig;
private final CosmosSinkContainersConfig containersConfig;
private final IdStrategyType idStrategyType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public void start(Map<String, String> props) {
this.sinkTaskConfig.getThroughputControlConfig(),
context.errantRecordReporter());
}

// TODO[public preview]: in V1, it will create the database if does not exists, but why?
}

private CosmosAsyncClient getThroughputControlCosmosClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
package com.azure.cosmos.kafka.connect.implementation.sink;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
Expand All @@ -15,7 +13,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -73,15 +70,8 @@ protected String getEtag(Object recordValue) {
protected PartitionKey getPartitionKeyValue(Object recordValue, PartitionKeyDefinition partitionKeyDefinition) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

//TODO[Public Preview]: add support for sub-partition
String partitionKeyPath = StringUtils.join(partitionKeyDefinition.getPaths(), "");
Map<String, Object> recordMap = (Map<String, Object>) recordValue;
Object partitionKeyValue = recordMap.get(partitionKeyPath.substring(1));

return ImplementationBridgeHelpers
.PartitionKeyHelper
.getPartitionKeyAccessor()
.toPartitionKey(Collections.singletonList(partitionKeyValue), false);
return PartitionKey.fromItem(recordMap, partitionKeyDefinition);
}

protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRetryCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public enum ItemWriteStrategy {
ITEM_OVERWRITE_IF_NOT_MODIFIED("ItemOverwriteIfNotModified"),
ITEM_PATCH("ItemPatch");

// TODO[Public Preview] Add ItemPatch, ItemBulkUpdate
// TODO[GA] Add ItemBulkUpdate
private final String name;

ItemWriteStrategy(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.List;
import java.util.Map;

// TODO[Public Preview]: Double check logic here, copied over from V1
// TODO[GA]: Double check logic here, copied over from V1
public class StructToJsonMap {

public static Map<String, Object> toJsonMap(Struct struct) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class CosmosSourceConfig extends KafkaCosmosConfig {
private static final String MESSAGE_KEY_FIELD_CONFIG = SOURCE_CONFIG_PREFIX + "messageKey.field";
private static final String MESSAGE_KEY_FIELD_CONFIG_DOC = "The field to use as the message key.";
private static final String MESSAGE_KEY_FIELD_CONFIG_DISPLAY = "Kafka message key field.";
private static final String DEFAULT_MESSAGE_KEY_FIELD = "id"; // TODO: should we use pk instead?
private static final String DEFAULT_MESSAGE_KEY_FIELD = "id";

private final CosmosSourceContainersConfig containersConfig;
private final CosmosMetadataConfig metadataConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.guava25.base.Stopwatch;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -130,23 +132,39 @@ private List<SourceRecord> executeMetadataTask(MetadataTaskUnit taskUnit) {

// add the containers metadata record - it tracks the databaseName -> List[containerRid] mapping
Pair<ContainersMetadataTopicPartition, ContainersMetadataTopicOffset> containersMetadata = taskUnit.getContainersMetadata();

// Convert JSON to Kafka Connect struct and JSON schema
SchemaAndValue containersMetadataSchemaAndValue = JsonToStruct.recordToSchemaAndValue(
Utils.getSimpleObjectMapper().convertValue(
ContainersMetadataTopicOffset.toMap(containersMetadata.getRight()),
ObjectNode.class));

sourceRecords.add(
new SourceRecord(
ContainersMetadataTopicPartition.toMap(containersMetadata.getLeft()),
ContainersMetadataTopicOffset.toMap(containersMetadata.getRight()),
taskUnit.getStorageName(),
SchemaAndValue.NULL.schema(),
SchemaAndValue.NULL.value()));
Schema.STRING_SCHEMA,
containersMetadata.getLeft().getDatabaseName(),
containersMetadataSchemaAndValue.schema(),
containersMetadataSchemaAndValue.value()));

// add the container feedRanges metadata record - it tracks the containerRid -> List[FeedRange] mapping
for (Pair<FeedRangesMetadataTopicPartition, FeedRangesMetadataTopicOffset> feedRangesMetadata : taskUnit.getFeedRangesMetadataList()) {
SchemaAndValue feedRangeMetadataSchemaAndValue = JsonToStruct.recordToSchemaAndValue(
Utils.getSimpleObjectMapper().convertValue(
FeedRangesMetadataTopicOffset.toMap(feedRangesMetadata.getRight()),
ObjectNode.class));

sourceRecords.add(
new SourceRecord(
FeedRangesMetadataTopicPartition.toMap(feedRangesMetadata.getLeft()),
FeedRangesMetadataTopicOffset.toMap(feedRangesMetadata.getRight()),
taskUnit.getStorageName(),
SchemaAndValue.NULL.schema(),
SchemaAndValue.NULL.value()));
Schema.STRING_SCHEMA,
feedRangesMetadata.getLeft().getDatabaseName() + "_" + feedRangesMetadata.getLeft().getContainerRid(),
feedRangeMetadataSchemaAndValue.schema(),
feedRangeMetadataSchemaAndValue.value()));
}

LOGGER.info("There are {} metadata records being created/updated", sourceRecords.size());
Expand Down
Loading

0 comments on commit b4588d0

Please sign in to comment.