diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md index 299c05fe02e2c..e78ff7414a24f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `BadRequestException` when customer using Serverless CosmosDB database account and metadata container does not exists. - See [PR 43125](https://github.com/Azure/azure-sdk-for-java/pull/43125) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index ad56aae6b7b1b..53b9d00ceea6d 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -215,12 +215,28 @@ private boolean shouldCreateMetadataContainerIfNotExists() { } private Mono createMetadataContainer() { + return this.createMetadataContainer(METADATA_CONTAINER_DEFAULT_RU_CONFIG) + .onErrorResume(throwable -> { + // for serverless database account, creating container with throughput configured will get 400/0 exceptions + if (KafkaCosmosExceptionsHelper.isBadRequestException(throwable)) { + LOGGER.info( + "Getting exception '{}' when creating metadata container with throughput, retrying creating without throughput.", + throwable.getMessage()); + return this.createMetadataContainer(null); + } + + return Mono.error(throwable); + }); + } + + private Mono createMetadataContainer(Integer throughput) { return this.cosmosClient .getDatabase(this.config.getContainersConfig().getDatabaseName()) .createContainer( this.config.getMetadataConfig().getStorageName(), "/id", - ThroughputProperties.createAutoscaledThroughput(METADATA_CONTAINER_DEFAULT_RU_CONFIG)); + throughput == null + ? null : ThroughputProperties.createAutoscaledThroughput(throughput)); } private void updateMetadataRecordsInCosmos(MetadataTaskUnit metadataTaskUnit) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java index de2ab238f4e1c..460f3951c2ef6 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java @@ -94,4 +94,11 @@ public static boolean isOwnerResourceNotExistsException(Throwable throwable) { && cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND && cosmosException.getSubStatusCode() == HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS; } + + public static boolean isBadRequestException(Throwable throwable) { + CosmosException cosmosException = Utils.as(Exceptions.unwrap(throwable), CosmosException.class); + return cosmosException != null + && cosmosException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST + && cosmosException.getSubStatusCode() == HttpConstants.SubStatusCodes.UNKNOWN; + } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java index a9c5b94f0d8e1..4aa1785aadba0 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java @@ -50,8 +50,8 @@ public class CosmosSourceConnectorITest extends KafkaCosmosIntegrationTestSuiteB @BeforeClass(groups = { "kafka-integration" }) public void before_CosmosSourceConnectorITest() { this.client = new CosmosClientBuilder() - .key(TestConfigurations.MASTER_KEY) - .endpoint(TestConfigurations.HOST) + .key(KafkaCosmosTestConfigurations.MASTER_KEY) + .endpoint(KafkaCosmosTestConfigurations.HOST) .endpointDiscoveryEnabled(true) .buildAsyncClient(); }