From 59ecccd23adff3907b3875bd24ed085dcd880b15 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 27 Nov 2024 14:59:14 -0800 Subject: [PATCH 1/4] fix badRequestException when using serverless database account type --- .../kafka/connect/CosmosSourceConnector.java | 18 ++++++++- .../KafkaCosmosExceptionsHelper.java | 8 ++++ .../connect/CosmosSourceConnectorITest.java | 4 +- .../http/Http2ConnectionConfig.java | 37 ------------------- 4 files changed, 27 insertions(+), 40 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index ad56aae6b7b1b..6533a458840dd 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -215,12 +215,28 @@ private boolean shouldCreateMetadataContainerIfNotExists() { } private Mono createMetadataContainer() { + return this.createMetadataContainer(METADATA_CONTAINER_DEFAULT_RU_CONFIG) + .onErrorResume(throwable -> { + // for serverless database account, creating container with throughput configured will get 400/0 exceptions + if (KafkaCosmosExceptionsHelper.isBadRequestException(throwable)) { + LOGGER.info( + "Getting exception '{}' when creating metadata container with throughput, retrying creating without throughput.", + throwable.getMessage()); + return this.createMetadataContainer(null); + } + + return Mono.error(throwable); + }); + } + + private Mono createMetadataContainer(Integer throughput) { return this.cosmosClient .getDatabase(this.config.getContainersConfig().getDatabaseName()) .createContainer( this.config.getMetadataConfig().getStorageName(), "/id", - ThroughputProperties.createAutoscaledThroughput(METADATA_CONTAINER_DEFAULT_RU_CONFIG)); + throughput == null ? + null : ThroughputProperties.createAutoscaledThroughput(throughput)); } private void updateMetadataRecordsInCosmos(MetadataTaskUnit metadataTaskUnit) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java index de2ab238f4e1c..ee030e7f5c0d5 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java @@ -94,4 +94,12 @@ public static boolean isOwnerResourceNotExistsException(Throwable throwable) { && cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND && cosmosException.getSubStatusCode() == HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS; } + + public static boolean isBadRequestException(Throwable throwable) { + if (throwable instanceof CosmosException) { + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.BADREQUEST; + } + + return false; + } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java index a9c5b94f0d8e1..4aa1785aadba0 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java @@ -50,8 +50,8 @@ public class CosmosSourceConnectorITest extends KafkaCosmosIntegrationTestSuiteB @BeforeClass(groups = { "kafka-integration" }) public void before_CosmosSourceConnectorITest() { this.client = new CosmosClientBuilder() - .key(TestConfigurations.MASTER_KEY) - .endpoint(TestConfigurations.HOST) + .key(KafkaCosmosTestConfigurations.MASTER_KEY) + .endpoint(KafkaCosmosTestConfigurations.HOST) .endpointDiscoveryEnabled(true) .buildAsyncClient(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java deleted file mode 100644 index f3d005804fd9a..0000000000000 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.implementation.http; - -import com.azure.cosmos.implementation.Configs; - -// [TODO Http2]: when adding public API, making this class public, add setter method -public class Http2ConnectionConfig { - private int maxConnectionPoolSize; - private int minConnectionPoolSize; - private int maxConcurrentStreams; - private boolean enabled; - - public Http2ConnectionConfig() { - this.maxConnectionPoolSize = Configs.getHttp2MaxConnectionPoolSize(); - this.minConnectionPoolSize = Configs.getHttp2MinConnectionPoolSize(); - this.maxConcurrentStreams = Configs.getHttp2MaxConcurrentStreams(); - this.enabled = Configs.isHttp2Enabled(); - } - - public int getMaxConnectionPoolSize() { - return maxConnectionPoolSize; - } - - public int getMinConnectionPoolSize() { - return minConnectionPoolSize; - } - - public int getMaxConcurrentStreams() { - return maxConcurrentStreams; - } - - public boolean isEnabled() { - return enabled; - } -} From 3c202807ccd5ea5385428d76a7ca9ff31a5c4e62 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 27 Nov 2024 15:16:20 -0800 Subject: [PATCH 2/4] update changelog --- sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md index 299c05fe02e2c..e78ff7414a24f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `BadRequestException` when customer using Serverless CosmosDB database account and metadata container does not exists. - See [PR 43125](https://github.com/Azure/azure-sdk-for-java/pull/43125) #### Other Changes From 15b24568d04afd5a25970436530249f6d7b435f2 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 28 Nov 2024 10:15:57 -0800 Subject: [PATCH 3/4] revert unnecessary changes --- .../http/Http2ConnectionConfig.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java new file mode 100644 index 0000000000000..f3d005804fd9a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ConnectionConfig.java @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.http; + +import com.azure.cosmos.implementation.Configs; + +// [TODO Http2]: when adding public API, making this class public, add setter method +public class Http2ConnectionConfig { + private int maxConnectionPoolSize; + private int minConnectionPoolSize; + private int maxConcurrentStreams; + private boolean enabled; + + public Http2ConnectionConfig() { + this.maxConnectionPoolSize = Configs.getHttp2MaxConnectionPoolSize(); + this.minConnectionPoolSize = Configs.getHttp2MinConnectionPoolSize(); + this.maxConcurrentStreams = Configs.getHttp2MaxConcurrentStreams(); + this.enabled = Configs.isHttp2Enabled(); + } + + public int getMaxConnectionPoolSize() { + return maxConnectionPoolSize; + } + + public int getMinConnectionPoolSize() { + return minConnectionPoolSize; + } + + public int getMaxConcurrentStreams() { + return maxConcurrentStreams; + } + + public boolean isEnabled() { + return enabled; + } +} From 97e5a523e8ffd9556ac35de5e1b75513495b1c61 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 4 Dec 2024 10:33:18 -0800 Subject: [PATCH 4/4] resolve comments --- .../cosmos/kafka/connect/CosmosSourceConnector.java | 4 ++-- .../implementation/KafkaCosmosExceptionsHelper.java | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index 6533a458840dd..53b9d00ceea6d 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -235,8 +235,8 @@ private Mono createMetadataContainer(Integer throughput .createContainer( this.config.getMetadataConfig().getStorageName(), "/id", - throughput == null ? - null : ThroughputProperties.createAutoscaledThroughput(throughput)); + throughput == null + ? null : ThroughputProperties.createAutoscaledThroughput(throughput)); } private void updateMetadataRecordsInCosmos(MetadataTaskUnit metadataTaskUnit) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java index ee030e7f5c0d5..460f3951c2ef6 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java @@ -96,10 +96,9 @@ public static boolean isOwnerResourceNotExistsException(Throwable throwable) { } public static boolean isBadRequestException(Throwable throwable) { - if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.BADREQUEST; - } - - return false; + CosmosException cosmosException = Utils.as(Exceptions.unwrap(throwable), CosmosException.class); + return cosmosException != null + && cosmosException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST + && cosmosException.getSubStatusCode() == HttpConstants.SubStatusCodes.UNKNOWN; } }