diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index 0728e53f64ca4..9b640bfbb8f35 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -5,7 +5,7 @@ * Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084) * Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101) * Fixed an issue preventing driver programs to cleanly shutting down due to active Cosmos Clients being cached. - See [PR 27137](https://github.com/Azure/azure-sdk-for-java/pull/27137) -* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) +* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) and [PR 27299](https://github.com/Azure/azure-sdk-for-java/pull/27299) ### 4.6.1 (2022-02-11) #### Key Bug Fixes diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index 8275a83ff1c7b..271e03871aecb 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -5,7 +5,7 @@ * Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084) * Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101) * Fixed an issue preventing driver programs to cleanly shutting down due to active Cosmos Clients being cached. - See [PR 27137](https://github.com/Azure/azure-sdk-for-java/pull/27137) -* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) +* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) and [PR 27299](https://github.com/Azure/azure-sdk-for-java/pull/27299) ### 4.6.1 (2022-02-11) #### Key Bug Fixes diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala index 14e56b843e98c..65198a47a2d02 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala @@ -89,6 +89,7 @@ private case class ItemsPartitionReader private lazy val iterator = new TransientIOErrorsRetryingIterator( continuationToken => { + if (!Strings.isNullOrWhiteSpace(continuationToken)) { ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount( queryOptions, continuationToken, readConfig.maxItemCount) @@ -99,6 +100,14 @@ private case class ItemsPartitionReader // scalastyle:on null } + queryOptions.setMaxBufferedItemCount( + math.min( + readConfig.maxItemCount * readConfig.prefetchBufferSize.toLong, // converting to long to avoid overflow when + // multiplying to ints + java.lang.Integer.MAX_VALUE + ).toInt + ) + ImplementationBridgeHelpers .CosmosQueryRequestOptionsHelper .getCosmosQueryRequestOptionsAccessor diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 608bc66b032b0..b7c1daffe2d44 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,7 +7,7 @@ #### Breaking Changes #### Bugs Fixed -* Fixed an issue in `CosmosPagedIterable` resulting in excessive memory consumption due to unbounded prefetch of pages when converting the `CosmosPagedIterable` into an `Iterator>`. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) +* Fixed an issue in `CosmosPagedIterable` resulting in excessive memory consumption due to unbounded prefetch of pages when converting the `CosmosPagedIterable` into an `Iterator>`. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27299](https://github.com/Azure/azure-sdk-for-java/pull/27299) * Fixed a `NullPointerException` in `CosmosDiagnostics isDiagnosticsCapturedInPagedFlux` - See [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java index 07b79c893aeb4..b4277c708be90 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java @@ -183,7 +183,9 @@ public Flux produceAsync() { executeRequestFuncWithRetries, resourceType, top, - pageSize) + pageSize, + Paginator.getPreFetchCount(cosmosQueryRequestOptions, top, pageSize) + ) .map(rsp -> { lastResponseContinuationToken = rsp.getContinuationToken(); this.fetchExecutionRangeAccumulator.endFetchRange(rsp.getActivityId(), diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index e7799ec30069c..d1a4339bdf6f9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -35,12 +35,15 @@ public static Flux> getPaginatedQueryResult Class resourceType, int maxPageSize) { + int top = -1; return getPaginatedQueryResultAsObservable( ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions), createRequestFunc, executeFunc, resourceType, - -1, maxPageSize); + top, + maxPageSize, + getPreFetchCount(cosmosQueryRequestOptions, top, maxPageSize)); } public static Flux> getPaginatedQueryResultAsObservable( @@ -49,7 +52,8 @@ public static Flux> getPaginatedQueryResult Function>> executeFunc, Class resourceType, int top, - int maxPageSize) { + int maxPageSize, + int maxPreFetchCount) { return getPaginatedQueryResultAsObservable( continuationToken, @@ -58,6 +62,7 @@ public static Flux> getPaginatedQueryResult resourceType, top, maxPageSize, + maxPreFetchCount, false); } @@ -118,6 +123,7 @@ private static Flux> getPaginatedQueryResul Class resourceType, int top, int maxPageSize, + int preFetchCount, boolean isChangeFeed) { return getPaginatedQueryResultAsObservable( @@ -128,6 +134,18 @@ private static Flux> getPaginatedQueryResul isChangeFeed, top, maxPageSize), - Queues.XS_BUFFER_SIZE); + preFetchCount); + } + + public static int getPreFetchCount(CosmosQueryRequestOptions queryOptions, int top, int maxPageSize) { + int maxBufferedItemCount = queryOptions != null ? queryOptions.getMaxBufferedItemCount() : 0; + if (maxBufferedItemCount <= 0) { + return Queues.XS_BUFFER_SIZE; + } + int effectivePageSize = top > 0 ? + Math.min(top, maxPageSize) : + Math.max(1, maxPageSize); + int prefetch = Math.max(1, maxBufferedItemCount / effectivePageSize); + return Math.min(prefetch, Queues.XS_BUFFER_SIZE); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/PaginatorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/PaginatorTest.java new file mode 100644 index 0000000000000..fc325c6851714 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/PaginatorTest.java @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.ModelBridgeInternal; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class PaginatorTest { + @Test(groups = { "unit" }, dataProvider = "queryParams") + public void preFetchCalculation( + CosmosQueryRequestOptions testQueryOptions, + int testTop, + int testPageSize, + int expectedPreFetchCount) { + + assertThat( + Paginator + .getPreFetchCount(testQueryOptions, testTop, testPageSize) + ).isEqualTo(expectedPreFetchCount); + } + + @DataProvider(name = "queryParams") + public static Object[][] queryParamProvider() { + + CosmosQueryRequestOptions optionsWithBufferSizeTenThousandAndDifferentMaxItemCount = + new CosmosQueryRequestOptions().setMaxBufferedItemCount(10_000); + // initial continuation token + ModelBridgeInternal + .setQueryRequestOptionsContinuationTokenAndMaxItemCount( + optionsWithBufferSizeTenThousandAndDifferentMaxItemCount, + "someContinuation", + 1); // maxItemCount 1 to test that explicit page Size trumps query options + + CosmosQueryRequestOptions optionsWithBufferSizeTenThousand = + new CosmosQueryRequestOptions().setMaxBufferedItemCount(10_000); + // initial continuation token + ModelBridgeInternal + .setQueryRequestOptionsContinuationTokenAndMaxItemCount( + optionsWithBufferSizeTenThousand, + "someContinuation", + 1_000); + + CosmosQueryRequestOptions optionsWithMaxIntAsMaxItemCount = + new CosmosQueryRequestOptions().setMaxBufferedItemCount(Integer.MAX_VALUE); + + return new Object[][] { + //options, top, pageSize, expectedPreFetchCount + { optionsWithBufferSizeTenThousand, -1, 1_000, 10 }, // top ignored + { optionsWithBufferSizeTenThousandAndDifferentMaxItemCount, -1, 1_000, 10 }, // explicit pageSize wins + { optionsWithBufferSizeTenThousand, 0, 1_000, 10 }, // top ignored + { optionsWithBufferSizeTenThousand, 500, 1_000, 20 }, // effective page size is top + { optionsWithBufferSizeTenThousand, 100, 1_000, 32 }, // effective page size is top - should result in 100 + // but max prefetch count is 32 + { optionsWithBufferSizeTenThousand, -1, -1, 32 }, // effective pageSize is at least 1 + { optionsWithBufferSizeTenThousand, -1, 0, 32 }, // effective pageSize is at least 1 + { optionsWithBufferSizeTenThousand, -1, 20_000, 1 }, // at least 1 page is buffered even when + // maxBufferedItemCount < maxItemCount + { optionsWithMaxIntAsMaxItemCount, -1, Integer.MAX_VALUE, 1 }, // Exactly 1 page is buffered when + // maxBufferedItemCount == maxItemCount + }; + } +}