diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index 9d31c150b1abe..e13a3ea46e2ef 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509) ### 4.35.0 (2024-11-27) diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index 05370ba9680fc..dc9fb1ca7a56b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509) ### 4.35.0 (2024-11-27) diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index 258bca0398bdd..d4042c11afe74 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509) ### 4.35.0 (2024-11-27) diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 3f95bc7111788..6e0fdadf810de 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509) ### 4.35.0 (2024-11-27) diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 70baf75320650..02d6e74ac76fb 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509) ### 4.35.0 (2024-11-27) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index f5aeca7eaa1f0..4d0938c1b3b76 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -89,6 +89,7 @@ private[spark] object CosmosConfigNames { val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled" val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations" val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize" + val WriteBulkMinTargetBatchSize = "spark.cosmos.write.bulk.minTargetBatchSize" val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions" val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes" val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize" @@ -195,6 +196,7 @@ private[spark] object CosmosConfigNames { WriteBulkPayloadSizeInBytes, WriteBulkInitialBatchSize, WriteBulkMaxBatchSize, + WriteBulkMinTargetBatchSize, WritePointMaxConcurrency, WritePatchDefaultOperationType, WritePatchColumnConfigs, @@ -1162,6 +1164,7 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy, maxMicroBatchPayloadSizeInBytes: Option[Int] = None, initialMicroBatchSize: Option[Int] = None, maxMicroBatchSize: Option[Int] = None, + minTargetMicroBatchSize: Option[Int] = None, flushCloseIntervalInSeconds: Int = 60, maxNoProgressIntervalInSeconds: Int = 180, maxRetryNoProgressIntervalInSeconds: Int = 45 * 60, @@ -1207,6 +1210,15 @@ private object CosmosWriteConfig { "too many RUs and you cannot enable thoughput control. NOTE: using throuhgput control is preferred and will." + "result in better throughput while still limiting the RU/s used.") + private val minTargetMicroBatchSize = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMinTargetBatchSize, + defaultValue = Option.apply(Configs.getMinTargetBulkMicroBatchSize), + mandatory = false, + parseFromStringFunction = minTargetBatchSizeString => Math.min(minTargetBatchSizeString.toInt, Configs.getMinTargetBulkMicroBatchSize), + helpMessage = "Cosmos DB min. target bulk micro batch size - a micro batch will be flushed to the backend " + + "when the number of documents enqueued exceeds the target micro batch size. The target micro batch size is " + + "calculated based on the throttling rate. This setting can be used to force the target batch size to have " + + " at least a certain size. NOTE: This should only be modified in rare edge cases.") + private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations, mandatory = false, parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt, @@ -1445,6 +1457,7 @@ private object CosmosWriteConfig { val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes) val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize) val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize) + val minTargetBatchSizeOpt = CosmosConfigEntry.parse(cfg, minTargetMicroBatchSize) val writeRetryCommitInterceptor = CosmosConfigEntry .parse(cfg, writeOnRetryCommitInterceptor).flatten @@ -1477,6 +1490,7 @@ private object CosmosWriteConfig { maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt, initialMicroBatchSize = initialBatchSizeOpt, maxMicroBatchSize = maxBatchSizeOpt, + minTargetMicroBatchSize = minTargetBatchSizeOpt, flushCloseIntervalInSeconds = CosmosConfigEntry.parse(cfg, flushCloseIntervalInSeconds).get, maxNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxNoProgressIntervalInSeconds).get, maxRetryNoProgressIntervalInSeconds = CosmosConfigEntry.parse(cfg, maxRetryNoProgressIntervalInSeconds).get, diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala index e86b4bf913fab..315fef9dcf87f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala @@ -34,18 +34,20 @@ class SparkE2EWriteITest itemWriteStrategy: ItemWriteStrategy, hasId: Boolean = true, initialBatchSize: Option[Int] = None, - maxBatchSize: Option[Int] = None) + maxBatchSize: Option[Int] = None, + minTargetBatchSize: Option[Int] = None) private val upsertParameterTest = Seq( - UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None), - UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None), - UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5)), - UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None), - UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None) + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None, minTargetBatchSize = None), + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None, minTargetBatchSize = None), + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5), minTargetBatchSize = None), + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5), minTargetBatchSize = Some(2)), + UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None, minTargetBatchSize = None), + UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None, minTargetBatchSize = None) ) - for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize) <- upsertParameterTest) { - it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize" in { + for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize, minTargetBatchSize) <- upsertParameterTest) { + it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize, minTargetBatchSize = $minTargetBatchSize" in { val cosmosEndpoint = TestConfigurations.HOST val cosmosMasterKey = TestConfigurations.MASTER_KEY @@ -90,6 +92,18 @@ class SparkE2EWriteITest case None => } + minTargetBatchSize match { + case Some(customMinTargetBatchSize) => + configMapBuilder += ( + "spark.cosmos.write.bulk.minTargetBatchSize" -> customMinTargetBatchSize.toString, + ) + + configOverrideMapBuilder += ( + "spark.cosmos.write.bulk.minTargetBatchSize" -> customMinTargetBatchSize.toString, + ) + case None => + } + val cfg = configMapBuilder.toMap val cfgOverwrite = configOverrideMapBuilder.toMap diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholdsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholdsTest.java index dd3520b8be37a..d8b38187f9622 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholdsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholdsTest.java @@ -41,7 +41,7 @@ public void neverThrottledShouldResultInMaxBatchSize() { } @Test(groups = { "unit" }) - public void alwaysThrottledShouldResultInBatSizeOfOne() { + public void alwaysThrottledShouldResultInBatchSizeOfOne() { String pkRangeId = UUID.randomUUID().toString(); PartitionScopeThresholds thresholds = new PartitionScopeThresholds(pkRangeId, new CosmosBulkExecutionOptionsImpl()); @@ -71,5 +71,13 @@ public void initialTargetMicroBatchSize() { bulkOperations.setMaxMicroBatchSize(maxBatchSize); thresholds = new PartitionScopeThresholds(pkRangeId, bulkOperations); assertThat(thresholds.getTargetMicroBatchSizeSnapshot()).isEqualTo(maxBatchSize); + + // initial targetBatchSize should be at least by minTargetBatchSize + int minTargetBatchSize = 5; + bulkOperations = new CosmosBulkExecutionOptionsImpl(); + bulkOperations.setInitialMicroBatchSize(1); + bulkOperations.setMinTargetMicroBatchSize(minTargetBatchSize); + thresholds = new PartitionScopeThresholds(pkRangeId, bulkOperations); + assertThat(thresholds.getTargetMicroBatchSizeSnapshot()).isEqualTo(minTargetBatchSize); } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 784f1ebf24fdb..c3332da920f84 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -12,8 +12,8 @@ * Added support to enable http2 for gateway mode with system property `COSMOS.HTTP2_ENABLED` and system variable `COSMOS_HTTP2_ENABLED`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) * Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MAX_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MAX_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) * Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MIN_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MIN_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) -* Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MAX_CONCURRENT_STREAMS` and system variable `COSMOS_HTTP2_MAX_CONCURRENT_STREAMS`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) - +* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509) + ### 4.65.0 (2024-11-19) #### Features Added diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index 2ec99d1e2dc30..7f07616919e06 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -208,6 +208,19 @@ public class Configs { public static final String PREVENT_INVALID_ID_CHARS_VARIABLE = "COSMOS_PREVENT_INVALID_ID_CHARS"; public static final boolean DEFAULT_PREVENT_INVALID_ID_CHARS = false; + // Bulk default settings + public static final String MIN_TARGET_BULK_MICRO_BATCH_SIZE = "COSMOS.MIN_TARGET_BULK_MICRO_BATCH_SIZE"; + public static final String MIN_TARGET_BULK_MICRO_BATCH_SIZE_VARIABLE = "COSMOS_MIN_TARGET_BULK_MICRO_BATCH_SIZE"; + public static final int DEFAULT_MIN_TARGET_BULK_MICRO_BATCH_SIZE = 1; + + public static final String MAX_BULK_MICRO_BATCH_CONCURRENCY = "COSMOS.MAX_BULK_MICRO_BATCH_CONCURRENCY"; + public static final String MAX_BULK_MICRO_BATCH_CONCURRENCY_VARIABLE = "COSMOS_MAX_BULK_MICRO_BATCH_CONCURRENCY"; + public static final int DEFAULT_MAX_BULK_MICRO_BATCH_CONCURRENCY = 1; + + public static final String MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS = "COSMOS.MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS"; + public static final String MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS_VARIABLE = "COSMOS_MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS"; + public static final int DEFAULT_MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS = 1000; + // Config of CodingErrorAction on charset decoder for malformed input public static final String CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT = "COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT"; public static final String DEFAULT_CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT = StringUtils.EMPTY; @@ -495,6 +508,48 @@ public static boolean isIdValueValidationEnabled() { return DEFAULT_PREVENT_INVALID_ID_CHARS; } + public static int getMinTargetBulkMicroBatchSize() { + String valueFromSystemProperty = System.getProperty(MIN_TARGET_BULK_MICRO_BATCH_SIZE); + if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) { + return Integer.parseInt(valueFromSystemProperty); + } + + String valueFromEnvVariable = System.getenv(MIN_TARGET_BULK_MICRO_BATCH_SIZE_VARIABLE); + if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) { + return Integer.parseInt(valueFromEnvVariable); + } + + return DEFAULT_MIN_TARGET_BULK_MICRO_BATCH_SIZE; + } + + public static int getMaxBulkMicroBatchConcurrency() { + String valueFromSystemProperty = System.getProperty(MAX_BULK_MICRO_BATCH_CONCURRENCY); + if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) { + return Integer.parseInt(valueFromSystemProperty); + } + + String valueFromEnvVariable = System.getenv(MAX_BULK_MICRO_BATCH_CONCURRENCY_VARIABLE); + if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) { + return Integer.parseInt(valueFromEnvVariable); + } + + return DEFAULT_MAX_BULK_MICRO_BATCH_CONCURRENCY; + } + + public static int getMaxBulkMicroBatchFlushIntervalInMs() { + String valueFromSystemProperty = System.getProperty(MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS); + if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) { + return Integer.parseInt(valueFromSystemProperty); + } + + String valueFromEnvVariable = System.getenv(MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS_VARIABLE); + if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) { + return Integer.parseInt(valueFromEnvVariable); + } + + return DEFAULT_MAX_BULK_MICRO_BATCH_FLUSH_INTERVAL_IN_MILLISECONDS; + } + public static int getMaxHttpRequestTimeout() { String valueFromSystemProperty = System.getProperty(HTTP_MAX_REQUEST_TIMEOUT); if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosBulkExecutionOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosBulkExecutionOptionsImpl.java index 31c0ea08cbcfa..2b6c5b410138c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosBulkExecutionOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosBulkExecutionOptionsImpl.java @@ -32,15 +32,14 @@ */ public class CosmosBulkExecutionOptionsImpl implements OverridableRequestOptions { private int initialMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST; - private int maxMicroBatchConcurrency = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_CONCURRENCY; - + private int maxMicroBatchConcurrency = Configs.getMaxBulkMicroBatchConcurrency(); + private int minTargetMicroBatchSize = Configs.getMinTargetBulkMicroBatchSize(); private int maxMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST; private double maxMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_RETRY_RATE; private double minMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MIN_MICRO_BATCH_RETRY_RATE; private int maxMicroBatchPayloadSizeInBytes = BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES; - private final Duration maxMicroBatchInterval = Duration.ofMillis( - BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS); + private final Duration maxMicroBatchInterval = Duration.ofMillis(Configs.getMaxBulkMicroBatchFlushIntervalInMs()); private final Object legacyBatchScopedContext; private final CosmosBulkExecutionThresholdsState thresholds; private Integer maxConcurrentCosmosPartitions = null; @@ -60,6 +59,7 @@ public CosmosBulkExecutionOptionsImpl(CosmosBulkExecutionOptionsImpl toBeCloned) this.initialMicroBatchSize = toBeCloned.initialMicroBatchSize; this.maxMicroBatchConcurrency = toBeCloned.maxMicroBatchConcurrency; this.maxMicroBatchSize = toBeCloned.maxMicroBatchSize; + this.minTargetMicroBatchSize = toBeCloned.minTargetMicroBatchSize; this.maxMicroBatchRetryRate = toBeCloned.maxMicroBatchRetryRate; this.minMicroBatchRetryRate = toBeCloned.minMicroBatchRetryRate; this.maxMicroBatchPayloadSizeInBytes = toBeCloned.maxMicroBatchPayloadSizeInBytes; @@ -130,6 +130,14 @@ public void setMaxMicroBatchSize(int maxMicroBatchSize) { this.maxMicroBatchSize = maxMicroBatchSize; } + public int getMinTargetMicroBatchSize() { + return minTargetMicroBatchSize; + } + + public void setMinTargetMicroBatchSize(int minTargetMicroBatchSize) { + this.minTargetMicroBatchSize = minTargetMicroBatchSize; + } + public CosmosItemSerializer getCustomItemSerializer() { return this.customSerializer; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstants.java index 72111f5e7884e..713a82242a51a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstants.java @@ -12,31 +12,22 @@ public final class BatchRequestResponseConstants { // Size limits: public static final int DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201; public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100; - - public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 1000; public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_AFTER_DRAINING_INCOMING_FLUX_IN_MILLISECONDS = 100; - public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 1; public static final double DEFAULT_MIN_MICRO_BATCH_RETRY_RATE = 0.1; public static final double DEFAULT_MAX_MICRO_BATCH_RETRY_RATE = 0.2; static final String FIELD_OPERATION_TYPE = "operationType"; - static final String FIELD_RESOURCE_TYPE = "resourceType"; - static final String FIELD_TIME_TO_LIVE_IN_SECONDS = "timeToLiveInSeconds"; static final String FIELD_ID = "id"; - static final String FIELD_INDEXING_DIRECTIVE = "indexingDirective"; static final String FIELD_IF_MATCH = "ifMatch"; static final String FIELD_IF_NONE_MATCH = "ifNoneMatch"; static final String FIELD_PARTITION_KEY = "partitionKey"; static final String FIELD_RESOURCE_BODY = "resourceBody"; - static final String FIELD_BINARY_ID = "binaryId"; - static final String FIELD_EFFECTIVE_PARTITIONKEY = "effectivePartitionKey"; static final String FIELD_STATUS_CODE = "statusCode"; static final String FIELD_SUBSTATUS_CODE = "subStatusCode"; static final String FIELD_REQUEST_CHARGE = "requestCharge"; static final String FIELD_RETRY_AFTER_MILLISECONDS = "retryAfterMilliseconds"; static final String FIELD_ETAG = "eTag"; static final String FIELD_MINIMAL_RETURN_PREFERENCE = "minimalReturnPreference"; - static final String FIELD_IS_CLIENTENCRYPTED = "isClientEncrypted"; // Batch supported operation type for json public static final String OPERATION_CREATE = "Create"; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java index 1736837219231..f5f951920ff3e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java @@ -3,10 +3,9 @@ package com.azure.cosmos.implementation.batch; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.CosmosBulkExecutionOptionsImpl; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; -import com.azure.cosmos.models.CosmosBulkExecutionOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,7 @@ public class PartitionScopeThresholds { private final double maxRetryRate; private final double avgRetryRate; private final int maxMicroBatchSize; + private final int minTargetMicroBatchSize; public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptionsImpl options) { checkNotNull(pkRangeId, "expected non-null pkRangeId"); @@ -46,9 +46,15 @@ public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptionsImpl this.maxMicroBatchSize = Math.min( options.getMaxMicroBatchSize(), BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST); + this.minTargetMicroBatchSize = Math.max( + options.getMinTargetMicroBatchSize(), + Configs.getMinTargetBulkMicroBatchSize() + ); this.targetMicroBatchSize = new AtomicInteger( - Math.min(options.getInitialMicroBatchSize(), this.maxMicroBatchSize)); + Math.max( + Math.min(options.getInitialMicroBatchSize(), this.maxMicroBatchSize), + Math.min(this.minTargetMicroBatchSize, this.maxMicroBatchSize))); } public String getPartitionKeyRangeId() { @@ -107,17 +113,25 @@ private void reevaluateThresholds( int microBatchSizeAfter = microBatchSizeBefore; if (retryRate < this.minRetryRate && microBatchSizeBefore < maxMicroBatchSize) { - int targetedNewBatchSize = Math.min( + int targetedNewBatchSize = Math.min( - microBatchSizeBefore * 2, - microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)), - maxMicroBatchSize); + Math.max( + Math.min( + microBatchSizeBefore * 2, + microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)), + this.minTargetMicroBatchSize), + this.maxMicroBatchSize); if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) { microBatchSizeAfter = targetedNewBatchSize; } } else if (!onlyUpscale && retryRate > this.maxRetryRate && microBatchSizeBefore > 1) { double deltaRate = retryRate - this.avgRetryRate; - int targetedNewBatchSize = Math.max(1, (int) (microBatchSizeBefore * (1 - deltaRate))); + int targetedNewBatchSize = + Math.min( + Math.max( + this.minTargetMicroBatchSize, + (int) (microBatchSizeBefore * (1 - deltaRate))), + this.maxMicroBatchSize); if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) { microBatchSizeAfter = targetedNewBatchSize; }