Skip to content

Commit

Permalink
Fixes yet another place where we prefetch too aggressively in the que…
Browse files Browse the repository at this point in the history
…ry path (#27299)

* Fixing NPE in CosmosPagedFlux when trying to access non-existing diagnostics

* Iterating on the change to limit prefetch memory consumption in CosmosPagedIterable in Spark query/change feed scenarios

* Fixing tests

* Fixing typo in comment

Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com>

* Updating changelogs

* Fixes yet another place where we prefetch too aggressively in the query path

* Update PaginatorTest.java

* Updated changelogs

* Fixing unintentional reformatting

Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com>

* Update Paginator.java

* Update ItemsPartitionReader.scala

Co-authored-by: Matias Quaranta <ealsur@users.noreply.github.com>
  • Loading branch information
FabianMeiswinkel and ealsur authored Feb 25, 2022
1 parent c3e9b68 commit cc10d8a
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 7 deletions.
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084)
* Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101)
* Fixed an issue preventing driver programs to cleanly shutting down due to active Cosmos Clients being cached. - See [PR 27137](https://github.com/Azure/azure-sdk-for-java/pull/27137)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) and [PR 27299](https://github.com/Azure/azure-sdk-for-java/pull/27299)

### 4.6.1 (2022-02-11)
#### Key Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084)
* Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101)
* Fixed an issue preventing driver programs to cleanly shutting down due to active Cosmos Clients being cached. - See [PR 27137](https://github.com/Azure/azure-sdk-for-java/pull/27137)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261) and [PR 27299](https://github.com/Azure/azure-sdk-for-java/pull/27299)

### 4.6.1 (2022-02-11)
#### Key Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private case class ItemsPartitionReader

private lazy val iterator = new TransientIOErrorsRetryingIterator(
continuationToken => {

if (!Strings.isNullOrWhiteSpace(continuationToken)) {
ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(
queryOptions, continuationToken, readConfig.maxItemCount)
Expand All @@ -99,6 +100,14 @@ private case class ItemsPartitionReader
// scalastyle:on null
}

queryOptions.setMaxBufferedItemCount(
math.min(
readConfig.maxItemCount * readConfig.prefetchBufferSize.toLong, // converting to long to avoid overflow when
// multiplying to ints
java.lang.Integer.MAX_VALUE
).toInt
)

ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue in `CosmosPagedIterable` resulting in excessive memory consumption due to unbounded prefetch of pages when converting the `CosmosPagedIterable` into an `Iterator<FeedResponse<T>>`. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237)
* Fixed an issue in `CosmosPagedIterable` resulting in excessive memory consumption due to unbounded prefetch of pages when converting the `CosmosPagedIterable` into an `Iterator<FeedResponse<T>>`. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27299](https://github.com/Azure/azure-sdk-for-java/pull/27299)
* Fixed a `NullPointerException` in `CosmosDiagnostics isDiagnosticsCapturedInPagedFlux` - See [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261)

#### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ public Flux<DocumentProducerFeedResponse> produceAsync() {
executeRequestFuncWithRetries,
resourceType,
top,
pageSize)
pageSize,
Paginator.getPreFetchCount(cosmosQueryRequestOptions, top, pageSize)
)
.map(rsp -> {
lastResponseContinuationToken = rsp.getContinuationToken();
this.fetchExecutionRangeAccumulator.endFetchRange(rsp.getActivityId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResult
Class<T> resourceType,
int maxPageSize) {

int top = -1;
return getPaginatedQueryResultAsObservable(
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions),
createRequestFunc,
executeFunc,
resourceType,
-1, maxPageSize);
top,
maxPageSize,
getPreFetchCount(cosmosQueryRequestOptions, top, maxPageSize));
}

public static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResultAsObservable(
Expand All @@ -49,7 +52,8 @@ public static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResult
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Class<T> resourceType,
int top,
int maxPageSize) {
int maxPageSize,
int maxPreFetchCount) {

return getPaginatedQueryResultAsObservable(
continuationToken,
Expand All @@ -58,6 +62,7 @@ public static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResult
resourceType,
top,
maxPageSize,
maxPreFetchCount,
false);
}

Expand Down Expand Up @@ -118,6 +123,7 @@ private static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResul
Class<T> resourceType,
int top,
int maxPageSize,
int preFetchCount,
boolean isChangeFeed) {

return getPaginatedQueryResultAsObservable(
Expand All @@ -128,6 +134,18 @@ private static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResul
isChangeFeed,
top,
maxPageSize),
Queues.XS_BUFFER_SIZE);
preFetchCount);
}

public static int getPreFetchCount(CosmosQueryRequestOptions queryOptions, int top, int maxPageSize) {
int maxBufferedItemCount = queryOptions != null ? queryOptions.getMaxBufferedItemCount() : 0;
if (maxBufferedItemCount <= 0) {
return Queues.XS_BUFFER_SIZE;
}
int effectivePageSize = top > 0 ?
Math.min(top, maxPageSize) :
Math.max(1, maxPageSize);
int prefetch = Math.max(1, maxBufferedItemCount / effectivePageSize);
return Math.min(prefetch, Queues.XS_BUFFER_SIZE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class PaginatorTest {
@Test(groups = { "unit" }, dataProvider = "queryParams")
public void preFetchCalculation(
CosmosQueryRequestOptions testQueryOptions,
int testTop,
int testPageSize,
int expectedPreFetchCount) {

assertThat(
Paginator
.getPreFetchCount(testQueryOptions, testTop, testPageSize)
).isEqualTo(expectedPreFetchCount);
}

@DataProvider(name = "queryParams")
public static Object[][] queryParamProvider() {

CosmosQueryRequestOptions optionsWithBufferSizeTenThousandAndDifferentMaxItemCount =
new CosmosQueryRequestOptions().setMaxBufferedItemCount(10_000);
// initial continuation token
ModelBridgeInternal
.setQueryRequestOptionsContinuationTokenAndMaxItemCount(
optionsWithBufferSizeTenThousandAndDifferentMaxItemCount,
"someContinuation",
1); // maxItemCount 1 to test that explicit page Size trumps query options

CosmosQueryRequestOptions optionsWithBufferSizeTenThousand =
new CosmosQueryRequestOptions().setMaxBufferedItemCount(10_000);
// initial continuation token
ModelBridgeInternal
.setQueryRequestOptionsContinuationTokenAndMaxItemCount(
optionsWithBufferSizeTenThousand,
"someContinuation",
1_000);

CosmosQueryRequestOptions optionsWithMaxIntAsMaxItemCount =
new CosmosQueryRequestOptions().setMaxBufferedItemCount(Integer.MAX_VALUE);

return new Object[][] {
//options, top, pageSize, expectedPreFetchCount
{ optionsWithBufferSizeTenThousand, -1, 1_000, 10 }, // top ignored
{ optionsWithBufferSizeTenThousandAndDifferentMaxItemCount, -1, 1_000, 10 }, // explicit pageSize wins
{ optionsWithBufferSizeTenThousand, 0, 1_000, 10 }, // top ignored
{ optionsWithBufferSizeTenThousand, 500, 1_000, 20 }, // effective page size is top
{ optionsWithBufferSizeTenThousand, 100, 1_000, 32 }, // effective page size is top - should result in 100
// but max prefetch count is 32
{ optionsWithBufferSizeTenThousand, -1, -1, 32 }, // effective pageSize is at least 1
{ optionsWithBufferSizeTenThousand, -1, 0, 32 }, // effective pageSize is at least 1
{ optionsWithBufferSizeTenThousand, -1, 20_000, 1 }, // at least 1 page is buffered even when
// maxBufferedItemCount < maxItemCount
{ optionsWithMaxIntAsMaxItemCount, -1, Integer.MAX_VALUE, 1 }, // Exactly 1 page is buffered when
// maxBufferedItemCount == maxItemCount
};
}
}

0 comments on commit cc10d8a

Please sign in to comment.