diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/FeedOptions.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/FeedOptions.java index 56ea0eb15cb8..496cb8a3258f 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/FeedOptions.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/FeedOptions.java @@ -23,6 +23,7 @@ public final class FeedOptions { private PartitionKey partitionkey; private boolean populateQueryMetrics; private Map properties; + private boolean allowEmptyPages; public FeedOptions() { } @@ -40,6 +41,7 @@ public FeedOptions(FeedOptions options) { this.requestContinuation = options.requestContinuation; this.partitionkey = options.partitionkey; this.populateQueryMetrics = options.populateQueryMetrics; + this.allowEmptyPages = options.allowEmptyPages; } /** @@ -338,4 +340,19 @@ public FeedOptions properties(Map properties) { this.properties = properties; return this; } + + /** + * Gets the option to allow empty result pages in feed response. + */ + public boolean allowEmptyPages() { + return allowEmptyPages; + } + + /** + * Sets the option to allow empty result pages in feed response. Defaults to false + * @param allowEmptyPages whether to allow empty pages in feed response + */ + public void allowEmptyPages(boolean allowEmptyPages) { + this.allowEmptyPages = allowEmptyPages; + } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/HttpConstants.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/HttpConstants.java index 1463693486b9..5f5d62eba5eb 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/HttpConstants.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/HttpConstants.java @@ -78,6 +78,9 @@ public static class HttpHeaders { public static final String IS_QUERY = "x-ms-documentdb-isquery"; public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition"; public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery"; + public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request"; + public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features"; + public static final String QUERY_VERSION = "x-ms-cosmos-query-version"; // Our custom DocDB headers public static final String CONTINUATION = "x-ms-continuation"; @@ -248,6 +251,7 @@ public static class A_IMHeaderValues { public static class Versions { public static final String CURRENT_VERSION = "2018-12-31"; + public static final String QUERY_VERSION = "1.0"; // TODO: FIXME we can use maven plugin for generating a version file // @see diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/OperationType.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/OperationType.java index 47d5c1d50db8..827fab31081d 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/OperationType.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/OperationType.java @@ -36,6 +36,7 @@ public enum OperationType { Replace, Resume, SqlQuery, + QueryPlan, Stop, Throttle, Update, @@ -50,4 +51,4 @@ public boolean isWriteOperation() { this == Upsert || this == Update; } -} \ No newline at end of file +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java index 07891c82eaba..06e3f39a6a72 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java @@ -111,7 +111,9 @@ private Flux readFeed(RxDocumentServiceRequest reques } private Flux query(RxDocumentServiceRequest request) { - request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true"); + if(request.getOperationType() != OperationType.QueryPlan) { + request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true"); + } switch (this.queryCompatibilityMode) { case SqlQuery: @@ -352,6 +354,7 @@ private Flux invokeAsyncInternal(RxDocumentServiceReq return this.replace(request); case SqlQuery: case Query: + case QueryPlan: return this.query(request); default: throw new IllegalStateException("Unknown operation type " + request.getOperationType()); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Utils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Utils.java index 5f7d08012be4..b6e9c3bd4a9a 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Utils.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Utils.java @@ -499,22 +499,22 @@ public static O as(I i, Class klass) { return null; } } - + @SuppressWarnings("unchecked") public static List immutableListOf() { return Collections.EMPTY_LIST; } - + public static List immutableListOf(V v1) { List list = new ArrayList<>(); list.add(v1); return Collections.unmodifiableList(list); } - + public static MapimmutableMapOf() { return Collections.emptyMap(); } - + public static MapimmutableMapOf(K k1, V v1) { Map map = new HashMap(); map.put(k1, v1); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java index 2d24f4915e1a..a8b36aa12ea1 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java @@ -29,12 +29,14 @@ import com.azure.data.cosmos.internal.routing.PartitionKeyInternal; import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity; import com.azure.data.cosmos.internal.routing.Range; +import com.azure.data.cosmos.internal.routing.RoutingMapProviderHelper; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -85,16 +87,16 @@ public Flux> executeAsync() { if (feedOptions == null) { feedOptions = new FeedOptions(); } - + FeedOptions newFeedOptions = new FeedOptions(feedOptions); - + // We can not go to backend with the composite continuation token, // but we still need the gateway for the query plan. // The workaround is to try and parse the continuation token as a composite continuation token. // If it is, then we send the query to the gateway with max degree of parallelism to force getting back the query plan - + String originalContinuation = newFeedOptions.requestContinuation(); - + if (isClientSideContinuationToken(originalContinuation)) { // At this point we know we want back a query plan newFeedOptions.requestContinuation(null); @@ -115,8 +117,14 @@ public Flux> executeAsync() { public Mono> getTargetPartitionKeyRanges(String resourceId, List> queryRanges) { // TODO: FIXME this needs to be revisited - Range r = new Range<>("", "FF", true, false); - return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null); + return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges); + } + + public Mono> getTargetPartitionKeyRangesById(String resourceId, String partitionKeyRangeIdInternal) { + return client.getPartitionKeyRangeCache().tryGetPartitionKeyRangeByIdAsync(resourceId, + partitionKeyRangeIdInternal, + false, + null).flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange))); } protected Function>> executeInternalAsyncFunc() { @@ -222,7 +230,7 @@ public RxDocumentServiceRequest createRequestAsync(String continuationToken, Int return request; } - + private static boolean isClientSideContinuationToken(String continuationToken) { if (continuationToken != null) { ValueHolder outCompositeContinuationToken = new ValueHolder(); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextBase.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextBase.java index 982b81e2d9e0..1b723b4427ac 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextBase.java @@ -207,7 +207,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part } if (this.resourceTypeEnum.isPartitioned()) { - request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.id())); + boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null; + if(!hasPartitionKey){ + request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.id())); + } } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java index a6f704e596b6..47c390f42f53 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java @@ -4,20 +4,26 @@ import com.azure.data.cosmos.BadRequestException; import com.azure.data.cosmos.BridgeInternal; +import com.azure.data.cosmos.CommonsBridgeInternal; import com.azure.data.cosmos.internal.DocumentCollection; import com.azure.data.cosmos.FeedOptions; -import com.azure.data.cosmos.PartitionKey; import com.azure.data.cosmos.Resource; import com.azure.data.cosmos.SqlQuerySpec; +import com.azure.data.cosmos.internal.HttpConstants; import com.azure.data.cosmos.internal.OperationType; import com.azure.data.cosmos.internal.PartitionKeyRange; import com.azure.data.cosmos.internal.ResourceType; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; +import com.azure.data.cosmos.internal.Strings; import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.caches.RxCollectionCache; +import com.azure.data.cosmos.internal.routing.PartitionKeyInternal; +import com.azure.data.cosmos.internal.routing.Range; +import org.apache.commons.lang3.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -60,37 +66,82 @@ public static Flux> proxyQueryExecutionContext = - collectionObs.flatMap(collection -> { - if (feedOptions != null && feedOptions.partitionKey() != null && feedOptions.partitionKey().equals(PartitionKey.None)) { - feedOptions.partitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collection.getPartitionKey()))); - } - return ProxyDocumentQueryExecutionContext.createAsync( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - collection, - isContinuationExpected, - correlatedActivityId); - }).switchIfEmpty(ProxyDocumentQueryExecutionContext.createAsync( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - null, - isContinuationExpected, - correlatedActivityId)); - - return proxyQueryExecutionContext; + DefaultDocumentQueryExecutionContext queryExecutionContext = new DefaultDocumentQueryExecutionContext( + client, + resourceTypeEnum, + resourceType, + query, + feedOptions, + resourceLink, + correlatedActivityId, + isContinuationExpected); + + if (ResourceType.Document != resourceTypeEnum) { + return Flux.just(queryExecutionContext); + } + + Mono queryExecutionInfoMono = + com.azure.data.cosmos.internal.query.QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink); + + return collectionObs.single().flatMap(collection -> + queryExecutionInfoMono.flatMap(partitionedQueryExecutionInfo -> { + QueryInfo queryInfo = + partitionedQueryExecutionInfo.getQueryInfo(); + // Non value aggregates must go through + // DefaultDocumentQueryExecutionContext + // Single partition query can serve queries like SELECT AVG(c + // .age) FROM c + // SELECT MIN(c.age) + 5 FROM c + // SELECT MIN(c.age), MAX(c.age) FROM c + // while pipelined queries can only serve + // SELECT VALUE . So we send the query down the old + // pipeline to avoid a breaking change. + // Should be fixed by adding support for nonvalueaggregates + if (queryInfo.hasAggregates() && !queryInfo.hasSelectValue()) { + if (feedOptions != null && feedOptions.enableCrossPartitionQuery()) { + return Mono.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.BADREQUEST, + "Cross partition query only supports 'VALUE " + + "' for aggregates")); + } + return Mono.just(queryExecutionContext); + } + + Mono> partitionKeyRanges; + // The partitionKeyRangeIdInternal is no more a public API on FeedOptions, but have the below condition + // for handling ParallelDocumentQueryTest#partitionKeyRangeId + if (feedOptions != null && !StringUtils.isEmpty(CommonsBridgeInternal.partitionKeyRangeIdInternal(feedOptions))) { + partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRangesById(collection.resourceId(), + CommonsBridgeInternal.partitionKeyRangeIdInternal(feedOptions)); + } else { + List> queryRanges = + partitionedQueryExecutionInfo.getQueryRanges(); + + if (feedOptions != null && feedOptions.partitionKey() != null) { + PartitionKeyInternal internalPartitionKey = + feedOptions.partitionKey() + .getInternalPartitionKey(); + Range range = Range.getPointRange(internalPartitionKey + .getEffectivePartitionKeyString(internalPartitionKey, + collection.getPartitionKey())); + queryRanges = Collections.singletonList(range); + } + partitionKeyRanges = queryExecutionContext + .getTargetPartitionKeyRanges(collection.resourceId(), queryRanges); + } + return partitionKeyRanges + .flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client, + resourceTypeEnum, + resourceType, + query, + feedOptions, + resourceLink, + isContinuationExpected, + partitionedQueryExecutionInfo, + pkranges, + collection.resourceId(), + correlatedActivityId).single()); + + })).flux(); } public static Flux> createSpecializedDocumentQueryExecutionContextAsync( @@ -106,7 +157,12 @@ public static Flux 0 || initialPageSize == -1, "MaxItemCount", "Invalid MaxItemCount %s", initialPageSize); @@ -116,6 +172,10 @@ public static Flux outOffsetContinuationToken) { + if (StringUtils.isEmpty(serializedOffsetContinuationToken)) { + return false; + } + + boolean parsed; + try { + outOffsetContinuationToken.v = new OffsetContinuationToken(serializedOffsetContinuationToken); + parsed = true; + } catch (Exception ex) { + logger.debug("Received exception {} when trying to parse: {}", + ex.getMessage(), + serializedOffsetContinuationToken); + parsed = false; + outOffsetContinuationToken.v = null; + } + + return parsed; + } + + public String getSourceToken() { + return super.getString(TOKEN_PROPERTY_NAME); + } + + private void setSourceToken(String sourceToken) { + BridgeInternal.setProperty(this, TOKEN_PROPERTY_NAME, sourceToken); + } + + public int getOffset() { + return super.getInt(OFFSET_PROPERTY_NAME); + } + + private void setOffset(int offset) { + BridgeInternal.setProperty(this, OFFSET_PROPERTY_NAME, offset); + } +} + diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContext.java index cd84e15bca88..e75efe49ed63 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContext.java @@ -34,6 +34,7 @@ */ public class ParallelDocumentQueryExecutionContext extends ParallelDocumentQueryExecutionContextBase { + private FeedOptions feedOptions; private ParallelDocumentQueryExecutionContext( IDocumentQueryClient client, @@ -50,6 +51,7 @@ private ParallelDocumentQueryExecutionContext( UUID correlatedActivityId) { super(client, partitionKeyRanges, resourceTypeEnum, resourceType, query, feedOptions, resourceLink, rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId); + this.feedOptions = feedOptions; } public static Flux> createAsync( @@ -169,9 +171,10 @@ private static class EmptyPagesFilterTransformer implements Function.DocumentProducerFeedResponse>, Flux>> { private final RequestChargeTracker tracker; private DocumentProducer.DocumentProducerFeedResponse previousPage; + private final FeedOptions feedOptions; public EmptyPagesFilterTransformer( - RequestChargeTracker tracker) { + RequestChargeTracker tracker, FeedOptions options) { if (tracker == null) { throw new IllegalArgumentException("Request Charge Tracker must not be null."); @@ -179,6 +182,7 @@ public EmptyPagesFilterTransformer( this.tracker = tracker; this.previousPage = null; + this.feedOptions = options; } private DocumentProducer.DocumentProducerFeedResponse plusCharge( @@ -222,7 +226,8 @@ public Flux> apply(Flux.DocumentProducerFeed // Emit an empty page so the downstream observables know when there are no more // results. return source.filter(documentProducerFeedResponse -> { - if (documentProducerFeedResponse.pageResult.results().isEmpty()) { + if (documentProducerFeedResponse.pageResult.results().isEmpty() + && !this.feedOptions.allowEmptyPages()) { // filter empty pages and accumulate charge tracker.addCharge(documentProducerFeedResponse.pageResult.requestCharge()); return false; @@ -304,7 +309,7 @@ public Flux> drainAsync( .map(DocumentProducer::produceAsync) // Merge results from all partitions. .collect(Collectors.toList()); - return Flux.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker())); + return Flux.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), this.feedOptions)); } @Override diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContextBase.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContextBase.java index e63832fb0f3b..5b3bf3fa9105 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ParallelDocumentQueryExecutionContextBase.java @@ -68,6 +68,12 @@ protected void initialize(String collectionRid, Map headers = new HashMap<>(commonRequestHeaders); headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); + if (feedOptions.partitionKey() != null) { + headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, feedOptions + .partitionKey() + .getInternalPartitionKey() + .toJson()); + } return this.createDocumentServiceRequest(headers, querySpecForInit, partitionKeyRange, collectionRid); }; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/PipelinedDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/PipelinedDocumentQueryExecutionContext.java index af8760bf79eb..699ef1ab0dec 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/PipelinedDocumentQueryExecutionContext.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/PipelinedDocumentQueryExecutionContext.java @@ -82,14 +82,41 @@ public static Flux>> createSkipComponentFunction; + if (queryInfo.hasOffset()) { + createSkipComponentFunction = (continuationToken) -> { + return SkipDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction, + queryInfo.getOffset(), + continuationToken); + }; + } else { + createSkipComponentFunction = createAggregateComponentFunction; + } + Function>> createTopComponentFunction; if (queryInfo.hasTop()) { createTopComponentFunction = (continuationToken) -> { - return TopDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction, - queryInfo.getTop(), continuationToken); + return TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction, + queryInfo.getTop(), queryInfo.getTop(), continuationToken); + }; + } else { + createTopComponentFunction = createSkipComponentFunction; + } + + Function>> createTakeComponentFunction; + if (queryInfo.hasLimit()) { + createTakeComponentFunction = (continuationToken) -> { + int totalLimit = queryInfo.getLimit(); + if (queryInfo.hasOffset()) { + // This is being done to match the limit from rewritten query + totalLimit = queryInfo.getOffset() + queryInfo.getLimit(); + } + return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction, + queryInfo.getLimit(), totalLimit, + continuationToken); }; } else { - createTopComponentFunction = createAggregateComponentFunction; + createTakeComponentFunction = createTopComponentFunction; } int actualPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(), @@ -100,7 +127,7 @@ public static Flux new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId)); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java deleted file mode 100644 index a0bf1a54df7c..000000000000 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.azure.data.cosmos.internal.query; - -import com.azure.data.cosmos.CosmosClientException; -import com.azure.data.cosmos.internal.DocumentCollection; -import com.azure.data.cosmos.FeedOptions; -import com.azure.data.cosmos.FeedResponse; -import com.azure.data.cosmos.Resource; -import com.azure.data.cosmos.SqlQuerySpec; -import com.azure.data.cosmos.internal.Exceptions; -import com.azure.data.cosmos.internal.HttpConstants; -import com.azure.data.cosmos.internal.PartitionKeyRange; -import com.azure.data.cosmos.internal.ResourceType; -import com.azure.data.cosmos.internal.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.UUID; -import java.util.function.Function; - -/** - * While this class is public, but it is not part of our published public APIs. - * This is meant to be internally used only by our sdk. - * - * This class is used as a proxy to wrap the - * DefaultDocumentQueryExecutionContext which is needed for sending the query to - * GATEWAY first and then uses PipelinedDocumentQueryExecutionContext after it - * gets the necessary info. - */ -public class ProxyDocumentQueryExecutionContext implements IDocumentQueryExecutionContext { - - private IDocumentQueryExecutionContext innerExecutionContext; - private IDocumentQueryClient client; - private ResourceType resourceTypeEnum; - private Class resourceType; - private FeedOptions feedOptions; - private SqlQuerySpec query; - private String resourceLink; - private DocumentCollection collection; - private UUID correlatedActivityId; - private boolean isContinuationExpected; - private final static Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public ProxyDocumentQueryExecutionContext( - IDocumentQueryExecutionContext innerExecutionContext, - IDocumentQueryClient client, - ResourceType resourceTypeEnum, - Class resourceType, - SqlQuerySpec query, - FeedOptions feedOptions, - String resourceLink, - DocumentCollection collection, - boolean isContinuationExpected, - UUID correlatedActivityId) { - this.innerExecutionContext = innerExecutionContext; - - this.client = client; - this.resourceTypeEnum = resourceTypeEnum; - this.resourceType = resourceType; - this.query = query; - this.feedOptions = feedOptions; - this.resourceLink = resourceLink; - - this.collection = collection; - this.isContinuationExpected = isContinuationExpected; - this.correlatedActivityId = correlatedActivityId; - } - - @Override - public Flux> executeAsync() { - - Function>> func = throwable -> { - - Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable); - - logger.debug("Received non result message from gateway", unwrappedException); - if (!(unwrappedException instanceof Exception)) { - logger.error("Unexpected failure", unwrappedException); - return Flux.error(unwrappedException); - } - - if (!isCrossPartitionQuery((Exception) unwrappedException)) { - // If this is not a cross partition query then propagate error - logger.debug("Failure from gateway", unwrappedException); - return Flux.error(unwrappedException); - } - - logger.debug("Setting up query pipeline using the query plan received form gateway"); - - // cross partition query construct pipeline - - CosmosClientException dce = (CosmosClientException) unwrappedException; - - PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = new - PartitionedQueryExecutionInfo(dce.error().getPartitionedQueryExecutionInfo()); - - logger.debug("Query Plan from gateway {}", partitionedQueryExecutionInfo); - - DefaultDocumentQueryExecutionContext queryExecutionContext = - (DefaultDocumentQueryExecutionContext) this.innerExecutionContext; - - Mono> partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRanges(collection.resourceId(), - partitionedQueryExecutionInfo.getQueryRanges()); - - Flux> exContext = partitionKeyRanges.flux() - .flatMap(pkranges -> DocumentQueryExecutionContextFactory.createSpecializedDocumentQueryExecutionContextAsync( - this.client, - this.resourceTypeEnum, - this.resourceType, - this.query, - this.feedOptions, - this.resourceLink, - isContinuationExpected, - partitionedQueryExecutionInfo, - pkranges, - this.collection.resourceId(), - this.correlatedActivityId)); - - return exContext.flatMap(IDocumentQueryExecutionContext::executeAsync); - }; - - return this.innerExecutionContext.executeAsync().onErrorResume(func); - } - - private boolean isCrossPartitionQuery(Exception exception) { - - CosmosClientException clientException = Utils.as(exception, CosmosClientException.class); - - if (clientException == null) { - return false; - } - - return (Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.BADREQUEST) && - Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.CROSS_PARTITION_QUERY_NOT_SERVABLE)); - } - - public static Flux> createAsync(IDocumentQueryClient client, - ResourceType resourceTypeEnum, Class resourceType, SqlQuerySpec query, FeedOptions feedOptions, - String resourceLink, DocumentCollection collection, boolean isContinuationExpected, - UUID correlatedActivityId) { - - IDocumentQueryExecutionContext innerExecutionContext = - new DefaultDocumentQueryExecutionContext( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - correlatedActivityId, - isContinuationExpected); - - return Flux.just(new ProxyDocumentQueryExecutionContext(innerExecutionContext, client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - collection, - isContinuationExpected, - correlatedActivityId)); - } -} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryFeature.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryFeature.java new file mode 100644 index 000000000000..5b3f9d784573 --- /dev/null +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryFeature.java @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.data.cosmos.internal.query; + +public enum QueryFeature { + None, + Aggregate, + CompositeAggregate, + Distinct, + GroupBy, + MultipleAggregates, + MultipleOrderBy, + OffsetAndLimit, + OrderBy, + Top +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryInfo.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryInfo.java index b610cf6630c4..7910c32543e2 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryInfo.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryInfo.java @@ -14,11 +14,14 @@ * Used internally to encapsulates a query's information in the Azure Cosmos DB database service. */ public final class QueryInfo extends JsonSerializable { + private static final String HAS_SELECT_VALUE = "hasSelectValue"; private Integer top; private List orderBy; private Collection aggregates; private Collection orderByExpressions; private String rewrittenQuery; + private Integer offset; + private Integer limit; public QueryInfo() { } @@ -68,4 +71,25 @@ public Collection getOrderByExpressions() { ? this.orderByExpressions : (this.orderByExpressions = super.getCollection("orderByExpressions", String.class)); } -} \ No newline at end of file + + public boolean hasSelectValue(){ + return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE); + } + + public boolean hasOffset() { + return this.getOffset() != null; + } + + public boolean hasLimit() { + return this.getLimit() != null; + } + + public Integer getLimit() { + return this.limit != null ? this.limit : (this.limit = super.getInt("limit")); + } + + public Integer getOffset() { + return this.offset != null ? this.offset : (this.offset = super.getInt("offset")); + } +} + diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryPlanRetriever.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryPlanRetriever.java new file mode 100644 index 000000000000..a2626fae9c00 --- /dev/null +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/QueryPlanRetriever.java @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.data.cosmos.internal.query; + +import com.azure.data.cosmos.SqlQuerySpec; +import com.azure.data.cosmos.internal.BackoffRetryUtility; +import com.azure.data.cosmos.internal.HttpConstants; +import com.azure.data.cosmos.internal.IDocumentClientRetryPolicy; +import com.azure.data.cosmos.internal.OperationType; +import com.azure.data.cosmos.internal.ResourceType; +import com.azure.data.cosmos.internal.RuntimeConstants; +import com.azure.data.cosmos.internal.RxDocumentServiceRequest; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +class QueryPlanRetriever { + private static final String TRUE = "True"; + private static final String SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " + + QueryFeature.CompositeAggregate.name() + ", " + + QueryFeature.MultipleOrderBy.name() + ", " + + QueryFeature.OrderBy.name() + ", " + + QueryFeature.OffsetAndLimit.name() + ", " + + QueryFeature.Top.name(); + + static Mono getQueryPlanThroughGatewayAsync(IDocumentQueryClient queryClient, + SqlQuerySpec sqlQuerySpec, + String resourceLink) { + final Map requestHeaders = new HashMap<>(); + requestHeaders.put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON); + requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY_PLAN_REQUEST, TRUE); + requestHeaders.put(HttpConstants.HttpHeaders.SUPPORTED_QUERY_FEATURES, SUPPORTED_QUERY_FEATURES); + requestHeaders.put(HttpConstants.HttpHeaders.QUERY_VERSION, HttpConstants.Versions.QUERY_VERSION); + + final RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.QueryPlan, + ResourceType.Document, + resourceLink, + requestHeaders); + request.UseGatewayMode = true; + request.setContentBytes(sqlQuerySpec.toJson().getBytes(StandardCharsets.UTF_8)); + + final IDocumentClientRetryPolicy retryPolicyInstance = + queryClient.getResetSessionTokenRetryPolicy().getRequestPolicy(); + + Function> executeFunc = req -> { + return BackoffRetryUtility.executeRetry(() -> { + retryPolicyInstance.onBeforeSendRequest(req); + return queryClient.executeQueryAsync(request).flatMap(rxDocumentServiceResponse -> { + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = + new PartitionedQueryExecutionInfo(rxDocumentServiceResponse.getReponseBodyAsString()); + return Mono.just(partitionedQueryExecutionInfo); + + }); + }, retryPolicyInstance); + }; + + return executeFunc.apply(request); + } +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/SkipDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/SkipDocumentQueryExecutionContext.java new file mode 100644 index 000000000000..81c824d360eb --- /dev/null +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/SkipDocumentQueryExecutionContext.java @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.data.cosmos.internal.query; + +import com.azure.data.cosmos.BridgeInternal; +import com.azure.data.cosmos.CosmosClientException; +import com.azure.data.cosmos.FeedResponse; +import com.azure.data.cosmos.Resource; +import com.azure.data.cosmos.internal.HttpConstants; +import com.azure.data.cosmos.internal.Utils; +import reactor.core.publisher.Flux; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public final class SkipDocumentQueryExecutionContext implements IDocumentQueryExecutionComponent { + + private final IDocumentQueryExecutionComponent component; + private int skipCount; + + SkipDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int skipCount) { + if (component == null) { + throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null"); + } + this.component = component; + this.skipCount = skipCount; + } + + public static Flux> createAsync( + Function>> createSourceComponentFunction, + int skipCount, + String continuationToken) { + OffsetContinuationToken offsetContinuationToken; + Utils.ValueHolder outOffsetContinuationToken = new Utils.ValueHolder<>(); + if (continuationToken != null) { + if (!OffsetContinuationToken.tryParse(continuationToken, outOffsetContinuationToken)) { + String message = String.format("Invalid JSON in continuation token %s for Skip~Context", + continuationToken); + CosmosClientException dce = + BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.BADREQUEST, + message); + return Flux.error(dce); + } + + offsetContinuationToken = outOffsetContinuationToken.v; + } else { + offsetContinuationToken = new OffsetContinuationToken(skipCount, null); + } + + return createSourceComponentFunction.apply(offsetContinuationToken.getSourceToken()) + .map(component -> new SkipDocumentQueryExecutionContext<>(component, + offsetContinuationToken.getOffset())); + } + + @Override + public Flux> drainAsync(int maxPageSize) { + + return this.component.drainAsync(maxPageSize).map(tFeedResponse -> { + + List documentsAfterSkip = + tFeedResponse.results().stream().skip(this.skipCount).collect(Collectors.toList()); + + int numberOfDocumentsSkipped = tFeedResponse.results().size() - documentsAfterSkip.size(); + this.skipCount -= numberOfDocumentsSkipped; + + Map headers = new HashMap<>(tFeedResponse.responseHeaders()); + if (this.skipCount >= 0) { + // Add Offset Continuation Token + String sourceContinuationToken = tFeedResponse.continuationToken(); + OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(this.skipCount, + sourceContinuationToken); + headers.put(HttpConstants.HttpHeaders.CONTINUATION, offsetContinuationToken.toJson()); + } + + return BridgeInternal.createFeedResponseWithQueryMetrics(documentsAfterSkip, headers, + BridgeInternal.queryMetricsFromFeedResponse(tFeedResponse)); + }); + } + + IDocumentQueryExecutionComponent getComponent() { + return this.component; + } +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/TopDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/TopDocumentQueryExecutionContext.java index 72e708d7641f..c5526e71e03b 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/TopDocumentQueryExecutionContext.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/TopDocumentQueryExecutionContext.java @@ -20,15 +20,18 @@ public class TopDocumentQueryExecutionContext implements IDo private final IDocumentQueryExecutionComponent component; private final int top; + // limit from rewritten query + private final int limit; - public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int top) { + public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int top, int limit) { this.component = component; this.top = top; + this.limit = limit; } public static Flux> createAsync( Function>> createSourceComponentFunction, - int topCount, String topContinuationToken) { + int topCount, int limit, String topContinuationToken) { TakeContinuationToken takeContinuationToken; if (topContinuationToken == null) { @@ -56,7 +59,7 @@ public static Flux> cre return createSourceComponentFunction .apply(takeContinuationToken.getSourceToken()) - .map(component -> new TopDocumentQueryExecutionContext<>(component, takeContinuationToken.getTakeCount())); + .map(component -> new TopDocumentQueryExecutionContext<>(component, takeContinuationToken.getTakeCount(), limit)); } @Override @@ -64,13 +67,18 @@ public Flux> drainAsync(int maxPageSize) { ParallelDocumentQueryExecutionContextBase context; if (this.component instanceof AggregateDocumentQueryExecutionContext) { - context = (ParallelDocumentQueryExecutionContextBase) ((AggregateDocumentQueryExecutionContext) this.component) - .getComponent(); + context = + (ParallelDocumentQueryExecutionContextBase) ((AggregateDocumentQueryExecutionContext) this.component) + .getComponent(); + } else if (this.component instanceof SkipDocumentQueryExecutionContext) { + context = + (ParallelDocumentQueryExecutionContextBase) ((SkipDocumentQueryExecutionContext) this.component) + .getComponent(); } else { context = (ParallelDocumentQueryExecutionContextBase) this.component; } - context.setTop(this.top); + context.setTop(this.limit); return this.component.drainAsync(maxPageSize).takeUntil(new Predicate>() { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/metrics/QueryMetricsTextWriter.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/metrics/QueryMetricsTextWriter.java index 4cbfc94cc568..6a6744b48234 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/metrics/QueryMetricsTextWriter.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/metrics/QueryMetricsTextWriter.java @@ -178,7 +178,7 @@ protected void writeIndexHitRatio(double indexHitRatio) { @Override protected void writeTotalQueryExecutionTime(Duration totalQueryExecutionTime) { - QueryMetricsTextWriter.appendNanosecondsToStringBuilder(stringBuilder, + QueryMetricsTextWriter.appendMillisecondsToStringBuilder(stringBuilder, QueryMetricsTextWriter.TotalQueryExecutionTime, durationToMilliseconds(totalQueryExecutionTime), 0); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyInternal.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyInternal.java index 54d2022cca1b..a839c211109a 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyInternal.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyInternal.java @@ -3,6 +3,7 @@ package com.azure.data.cosmos.internal.routing; +import com.azure.data.cosmos.PartitionKeyDefinition; import com.azure.data.cosmos.internal.Undefined; import com.azure.data.cosmos.internal.RMResources; import com.azure.data.cosmos.internal.Strings; @@ -220,6 +221,10 @@ public List getComponents() { return components; } + public String getEffectivePartitionKeyString(PartitionKeyInternal internalPartitionKey, PartitionKeyDefinition partitionKey) { + return PartitionKeyInternalHelper.getEffectivePartitionKeyString(internalPartitionKey, partitionKey); + } + @SuppressWarnings("serial") static final class PartitionKeyInternalJsonSerializer extends StdSerializer { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelper.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelper.java index 8668f4d69cdd..ea5b990f3999 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelper.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelper.java @@ -3,11 +3,15 @@ package com.azure.data.cosmos.internal.routing; +import com.azure.data.cosmos.internal.IRoutingMapProvider; import com.azure.data.cosmos.internal.PartitionKeyRange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.ListIterator; /** * Provide utility functionality to route request in direct connectivity mode in the Azure Cosmos DB database service. @@ -19,7 +23,7 @@ private static String max(String left, String right) { return left.compareTo(right) < 0 ? right : left; } - private static > boolean IsSortedAndNonOverlapping(List> list) { + private static > boolean isSortedAndNonOverlapping(List> list) { for (int i = 1; i < list.size(); i++) { Range previousRange = list.get(i - 1); Range currentRange = list.get(i); @@ -37,7 +41,7 @@ private static > boolean IsSortedAndNonOverlapping(List< public static Collection getOverlappingRanges(RoutingMapProvider routingMapProvider, String collectionSelfLink, List> sortedRanges) { - if (!IsSortedAndNonOverlapping(sortedRanges)) { + if (!isSortedAndNonOverlapping(sortedRanges)) { throw new IllegalArgumentException("sortedRanges"); } @@ -74,4 +78,64 @@ public static Collection getOverlappingRanges(RoutingMapProvi return targetRanges; } + + public static Mono> getOverlappingRanges(IRoutingMapProvider routingMapProvider, + String resourceId, List> sortedRanges) { + + if (routingMapProvider == null){ + throw new IllegalArgumentException("routingMapProvider"); + } + + if (sortedRanges == null) { + throw new IllegalArgumentException("sortedRanges"); + } + + if (!isSortedAndNonOverlapping(sortedRanges)) { + throw new IllegalArgumentException("sortedRanges"); + } + + List targetRanges = new ArrayList<>(); + final ListIterator> iterator = sortedRanges.listIterator(); + + return Flux.defer(() -> { + if (!iterator.hasNext()) { + return Flux.empty(); + } + + Range queryRange; + Range sortedRange = iterator.next(); + if (!targetRanges.isEmpty()) { + String left = max(targetRanges.get(targetRanges.size() - 1).getMaxExclusive(), + sortedRange.getMin()); + + boolean leftInclusive = left.compareTo(sortedRange.getMin()) == 0 && sortedRange.isMinInclusive(); + + queryRange = new Range(left, sortedRange.getMax(), leftInclusive, + sortedRange.isMaxInclusive()); + } else { + queryRange = sortedRange; + } + + return routingMapProvider.tryGetOverlappingRangesAsync(resourceId, queryRange, false, null) + .map(targetRanges::addAll) + .flatMap(aBoolean -> { + if (!targetRanges.isEmpty()) { + Range lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange(); + while (iterator.hasNext()) { + Range value = iterator.next(); + if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) { + // Since we already moved forward on iterator to check above condition, we + // go to previous when it fails so the the value is not skipped on iteration + iterator.previous(); + break; + } + } + } + return Mono.just(targetRanges); + }).flux(); + }).repeat(sortedRanges.size()) + .takeUntil(stringRange -> !iterator.hasNext()) + .last() + .single(); + } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ConsistencyTests2.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ConsistencyTests2.java index 9a0959f6cd87..1cd9eb461edb 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ConsistencyTests2.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ConsistencyTests2.java @@ -70,14 +70,14 @@ public void validateWriteSessionOnAsyncReplication() throws InterruptedException @Test(groups = {"direct"}, timeOut = CONSISTENCY_TEST_TIMEOUT, enabled = false) public void validateEventualConsistencyOnAsyncReplicationDirect() { - //TODO this need to complete once we implement emulator container in java, and the we can do operation + //TODO this need to complete once we implement emulator container in java, and the we can do operation // like pause, resume, stop, recycle on it needed for this test. // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/355053 } @Test(groups = {"direct"}, timeOut = CONSISTENCY_TEST_TIMEOUT, enabled = false) public void validateEventualConsistencyOnAsyncReplicationGateway() { - //TODO this need to complete once we implement emulator container in java, and the we can do operation + //TODO this need to complete once we implement emulator container in java, and the we can do operation // like pause, resume, stop, recycle on it needed for this test. // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/355053 } @@ -238,6 +238,7 @@ public void validateSessionTokenAsync() { try { FeedOptions feedOptions = new FeedOptions(); feedOptions.enableCrossPartitionQuery(true); + feedOptions.allowEmptyPages(true); FeedResponse queryResponse = client.queryDocuments(createdCollection.selfLink(), "SELECT * FROM c WHERE c.Id = " + "'foo'", feedOptions) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/DocumentQuerySpyWireContentTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/DocumentQuerySpyWireContentTest.java index 4ed2b01753e3..a082737ecae5 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/DocumentQuerySpyWireContentTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/DocumentQuerySpyWireContentTest.java @@ -101,7 +101,7 @@ public void queryWithContinuationTokenLimit(FeedOptions options, String query, b .collectList().block(); assertThat(results.size()).describedAs("total results").isGreaterThanOrEqualTo(1); - + List requests = client.getCapturedRequests(); for(HttpRequest req: requests) { @@ -111,17 +111,19 @@ public void queryWithContinuationTokenLimit(FeedOptions options, String query, b private void validateRequestHasContinuationTokenLimit(HttpRequest request, Integer expectedValue) { Map headers = request.headers().toMap(); - if (expectedValue != null && expectedValue > 0) { - assertThat(headers - .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) + if(headers.get(HttpConstants.HttpHeaders.IS_QUERY) != null ) { + if (expectedValue != null && expectedValue > 0) { + assertThat(headers + .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) .isTrue(); - assertThat(headers - .get("x-ms-documentdb-responsecontinuationtokenlimitinkb")) + assertThat(headers + .get("x-ms-documentdb-responsecontinuationtokenlimitinkb")) .isEqualTo(Integer.toString(expectedValue)); - } else { - assertThat(headers - .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) + } else { + assertThat(headers + .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) .isFalse(); + } } } @@ -159,7 +161,7 @@ public void beforeClass() throws Exception { FeedOptions options = new FeedOptions(); options.enableCrossPartitionQuery(true); - + // do the query once to ensure the collection is cached. client.queryDocuments(getMultiPartitionCollectionLink(), "select * from root", options) .then().block(); @@ -185,4 +187,4 @@ private static Document getDocumentDefinition(int cnt) { , uuid, cnt, cnt)); return doc; } -} \ No newline at end of file +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java index 42201b08dbf8..4f9a810a2612 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java @@ -3,16 +3,22 @@ package com.azure.data.cosmos.internal.routing; +import com.azure.data.cosmos.internal.IRoutingMapProvider; import com.azure.data.cosmos.internal.PartitionKeyRange; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.mockito.Matchers; +import org.mockito.Mockito; import org.testng.annotations.Test; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -126,4 +132,56 @@ public String apply(PartitionKeyRange range) { assertThat("3,4").isEqualTo(ranges.stream().map(func).collect(Collectors.joining(","))); } + + @Test(groups = {"unit"}) + public void getOverlappingRangesWithList() { + + Function func = new Function() { + @Override + public String apply(PartitionKeyRange range) { + return range.id(); + } + }; + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + List rangeList = Arrays.asList(new PartitionKeyRange("0", "", "000A"), + new PartitionKeyRange("1", "000A", "000D"), + new PartitionKeyRange("2", "000D", "0012"), + new PartitionKeyRange("3", "0012", "0015"), + new PartitionKeyRange("4", "0015", "0020"), + new PartitionKeyRange("5", "0020", "0040"), + new PartitionKeyRange("6", "0040", "FF")); + Mono> listSingle = Mono.just(rangeList); + + Map> resultMap = new HashMap<>(); + + resultMap.put(new Range<>("000D", "0012", true, false), + Collections.singletonList(new PartitionKeyRange("2", "000D", "0012"))); + resultMap.put(new Range<>("0012", "0015", true, false), + Collections.singletonList(new PartitionKeyRange("3", "0012", "0015"))); + resultMap.put(new Range<>("0015", "0020", true, false), + Collections.singletonList(new PartitionKeyRange("4", "0015", "00120"))); + + Mockito.doAnswer(invocationOnMock -> { + Range range = invocationOnMock.getArgumentAt(1, Range.class); + return Mono.just(resultMap.get(range)); + }).when(routingMapProviderMock).tryGetOverlappingRangesAsync(Matchers.anyString(), + Matchers.any(), + Matchers.anyBoolean(), + Matchers.anyMap()); + + Mono> overlappingRanges; + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "/dbs/db1/colls/coll1", + Arrays.asList(new Range("000D", "0012", true, false), + new Range("0012", "0015", true, false), + new Range<>("0015", "0020", true, false))); + assertThat("2,3,4").isEqualTo(overlappingRanges.block().stream().map(func).collect(Collectors.joining(","))); + + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "/dbs/db1/colls/coll1", + Arrays.asList(new Range("000D", "0012", true, false))); + assertThat("2").isEqualTo(overlappingRanges.block().stream().map(func).collect(Collectors.joining(","))); + + } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/AggregateQueryTests.java index 9e37686cf4f9..25cd9beec1c7 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/AggregateQueryTests.java @@ -159,9 +159,15 @@ public void generateTestConfigs() { String testName = String.format("%s SinglePartition %s", config.operator, "SELECT VALUE"); queryConfigs.add(new QueryConfig(testName, query, config.expected)); - query = String.format(aggregateSinglePartitionQueryFormatSelect, config.operator, field, partitionKey, uniquePartitionKey); + // Should add support for non value aggregates before enabling these. + // https://github.com/Azure/azure-sdk-for-java/issues/6088 + /* + query = String.format(aggregateSinglePartitionQueryFormatSelect, config.operator, field, partitionKey, + uniquePartitionKey); testName = String.format("%s SinglePartition %s", config.operator, "SELECT"); - queryConfigs.add(new QueryConfig(testName, query, new Document("{'$1':" + removeTrailingZerosIfInteger(config.expected) + "}"))); + queryConfigs.add(new QueryConfig(testName, query, new Document("{'$1':" + removeTrailingZerosIfInteger + (config.expected) + "}"))); + */ } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OffsetLimitQueryTests.java new file mode 100644 index 000000000000..00058ff0415b --- /dev/null +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OffsetLimitQueryTests.java @@ -0,0 +1,219 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.data.cosmos.rx; + +import com.azure.data.cosmos.BridgeInternal; +import com.azure.data.cosmos.CosmosClient; +import com.azure.data.cosmos.CosmosClientBuilder; +import com.azure.data.cosmos.CosmosContainer; +import com.azure.data.cosmos.CosmosDatabase; +import com.azure.data.cosmos.CosmosItemProperties; +import com.azure.data.cosmos.FeedOptions; +import com.azure.data.cosmos.FeedResponse; +import com.azure.data.cosmos.internal.FeedResponseListValidator; +import com.azure.data.cosmos.internal.FeedResponseValidator; +import com.azure.data.cosmos.internal.Utils; +import com.azure.data.cosmos.internal.query.OffsetContinuationToken; +import io.reactivex.subscribers.TestSubscriber; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class OffsetLimitQueryTests extends TestSuiteBase { + private CosmosDatabase createdDatabase; + private CosmosContainer createdCollection; + private ArrayList docs = new ArrayList<>(); + + private String partitionKey = "mypk"; + private int firstPk = 0; + private int secondPk = 1; + private String field = "field"; + + private CosmosClient client; + + @Factory(dataProvider = "clientBuildersWithDirect") + public OffsetLimitQueryTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") + public void queryDocuments(boolean qmEnabled) { + int skipCount = 4; + int takeCount = 10; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + FeedOptions options = new FeedOptions(); + options.maxItemCount(5); + options.enableCrossPartitionQuery(true); + options.populateQueryMetrics(qmEnabled); + options.maxDegreeOfParallelism(2); + Flux> queryObservable = createdCollection.queryItems(query, options); + + FeedResponseListValidator validator = + new FeedResponseListValidator.Builder() + .totalSize(takeCount) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0) + .build()) + .hasValidQueryMetrics(qmEnabled) + .build(); + + validateQuerySuccess(queryObservable, validator, TIMEOUT); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void drainAllDocumentsUsingOffsetLimit() { + int skipCount = 0; + int takeCount = 2; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + FeedOptions options = new FeedOptions(); + options.enableCrossPartitionQuery(true); + options.maxDegreeOfParallelism(2); + Flux> queryObservable; + + int totalDocsObtained = 0; + int totalDocs = docs.size(); + int expectedNumCalls = totalDocs / takeCount; + int numCalls = 0; + FeedResponse finalResponse = null; + + while (numCalls < expectedNumCalls) { + query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + queryObservable = createdCollection.queryItems(query, options); + Iterator> iterator = queryObservable.toIterable().iterator(); + while (iterator.hasNext()) { + FeedResponse next = iterator.next(); + totalDocsObtained += next.results().size(); + finalResponse = next; + } + numCalls++; + skipCount += takeCount; + } + assertThat(totalDocsObtained).isEqualTo(docs.size()); + assertThat(finalResponse.continuationToken()).isNull(); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void offsetContinuationTokenRoundTrips() { + // Positive + OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(42, "asdf"); + String serialized = offsetContinuationToken.toString(); + Utils.ValueHolder outOffsetContinuationToken = new Utils.ValueHolder<>(); + + assertThat(OffsetContinuationToken.tryParse(serialized, outOffsetContinuationToken)).isTrue(); + OffsetContinuationToken deserialized = outOffsetContinuationToken.v; + + assertThat(deserialized.getOffset()).isEqualTo(42); + assertThat(deserialized.getSourceToken()).isEqualTo("asdf"); + + // Negative + Utils.ValueHolder outTakeContinuationToken = + new Utils.ValueHolder(); + assertThat( + OffsetContinuationToken.tryParse("{\"property\": \"Not a valid token\"}", outTakeContinuationToken)) + .isFalse(); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT * 10) + public void queryDocumentsWithOffsetContinuationTokens() { + int skipCount = 3; + int takeCount = 10; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + this.queryWithContinuationTokensAndPageSizes(query, new int[] {1, 5, 15}, takeCount); + } + + private void queryWithContinuationTokensAndPageSizes(String query, int[] pageSizes, int takeCount) { + for (int pageSize : pageSizes) { + List receivedDocuments = this.queryWithContinuationTokens(query, pageSize); + Set actualIds = new HashSet(); + for (CosmosItemProperties CosmosItemProperties : receivedDocuments) { + actualIds.add(CosmosItemProperties.resourceId()); + } + + assertThat(actualIds.size()).describedAs("total number of results").isEqualTo(takeCount); + } + } + + private List queryWithContinuationTokens(String query, int pageSize) { + String requestContinuation = null; + List continuationTokens = new ArrayList(); + List receivedDocuments = new ArrayList(); + + do { + FeedOptions options = new FeedOptions(); + options.maxItemCount(pageSize); + options.enableCrossPartitionQuery(true); + options.maxDegreeOfParallelism(2); + options.requestContinuation(requestContinuation); + Flux> queryObservable = + createdCollection.queryItems(query, options); + + TestSubscriber> testSubscriber = new TestSubscriber<>(); + queryObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); + testSubscriber.assertNoErrors(); + testSubscriber.assertComplete(); + + FeedResponse firstPage = + (FeedResponse) testSubscriber.getEvents().get(0).get(0); + requestContinuation = firstPage.continuationToken(); + receivedDocuments.addAll(firstPage.results()); + + continuationTokens.add(requestContinuation); + } while (requestContinuation != null); + + return receivedDocuments; + } + + public void bulkInsert() { + generateTestData(); + voidBulkInsertBlocking(createdCollection, docs); + } + + public void generateTestData() { + + for (int i = 0; i < 10; i++) { + CosmosItemProperties d = new CosmosItemProperties(); + d.id(Integer.toString(i)); + BridgeInternal.setProperty(d, field, i); + BridgeInternal.setProperty(d, partitionKey, firstPk); + docs.add(d); + } + + for (int i = 10; i < 20; i++) { + CosmosItemProperties d = new CosmosItemProperties(); + d.id(Integer.toString(i)); + BridgeInternal.setProperty(d, field, i); + BridgeInternal.setProperty(d, partitionKey, secondPk); + docs.add(d); + } + } + + @AfterClass(groups = {"simple"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeClose(client); + } + + @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() throws Exception { + client = this.clientBuilder().build(); + createdCollection = getSharedMultiPartitionCosmosContainer(client); + truncateCollection(createdCollection); + + bulkInsert(); + + waitIfNeededForReplicasToCatchUp(clientBuilder()); + } +} + diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OrderbyDocumentQueryTest.java index 8f60fd8dc632..d656a9bee096 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/OrderbyDocumentQueryTest.java @@ -14,7 +14,12 @@ import com.azure.data.cosmos.FeedResponse; import com.azure.data.cosmos.PartitionKey; import com.azure.data.cosmos.Resource; -import com.azure.data.cosmos.internal.*; +import com.azure.data.cosmos.internal.FailureValidator; +import com.azure.data.cosmos.internal.FeedResponseListValidator; +import com.azure.data.cosmos.internal.FeedResponseValidator; +import com.azure.data.cosmos.internal.ResourceValidator; +import com.azure.data.cosmos.internal.RetryAnalyzer; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.Utils.ValueHolder; import com.azure.data.cosmos.internal.query.CompositeContinuationToken; import com.azure.data.cosmos.internal.query.OrderByContinuationToken; @@ -23,13 +28,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.reactivex.subscribers.TestSubscriber; import org.apache.commons.lang3.StringUtils; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Factory; -import org.testng.annotations.Test; -import org.testng.annotations.Ignore; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; import reactor.core.publisher.Flux; import java.util.ArrayList; @@ -45,8 +49,6 @@ import static org.assertj.core.api.Assertions.assertThat; -//FIXME beforeClass times out inconsistently. -@Ignore public class OrderbyDocumentQueryTest extends TestSuiteBase { private final double minQueryRequestChargePerPartition = 2.0; @@ -62,8 +64,6 @@ public OrderbyDocumentQueryTest(CosmosClientBuilder clientBuilder) { super(clientBuilder); } - //FIXME test times out inconsistently - @Ignore @Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") public void queryDocumentsValidateContent(boolean qmEnabled) throws Exception { CosmosItemProperties expectedDocument = createdDocuments.get(0); @@ -214,7 +214,7 @@ public void queryOrderWithTop(int topValue) throws Exception { Comparator validatorComparator = Comparator.nullsFirst(Comparator.naturalOrder()); - List expectedResourceIds = + List expectedResourceIds = sortDocumentsAndCollectResourceIds("propInt", d -> d.getInt("propInt"), validatorComparator) .stream().limit(topValue).collect(Collectors.toList()); @@ -238,19 +238,6 @@ private List sortDocumentsAndCollectResourceIds(String propName, Fun .map(Resource::resourceId).collect(Collectors.toList()); } - @Test(groups = { "simple" }, timeOut = TIMEOUT) - public void crossPartitionQueryNotEnabled() throws Exception { - String query = "SELECT * FROM r ORDER BY r.propInt"; - FeedOptions options = new FeedOptions(); - Flux> queryObservable = createdCollection.queryItems(query, options); - - FailureValidator validator = new FailureValidator.Builder() - .instanceOf(CosmosClientException.class) - .statusCode(400) - .build(); - validateQueryFailure(queryObservable, validator); - } - @Test(groups = { "simple" }, timeOut = TIMEOUT) public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exception { String query = "SELECT * FROM r ORDER BY r.propScopedPartitionInt ASC"; @@ -292,10 +279,10 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc .allPagesSatisfy(new FeedResponseValidator.Builder() .requestChargeGreaterThanOrEqualTo(1.0).build()) .build(); - + validateQuerySuccess(queryObservable, validator); } - + @Test(groups = { "simple" }, timeOut = TIMEOUT) public void orderByContinuationTokenRoundTrip() throws Exception { { @@ -309,7 +296,7 @@ public void orderByContinuationTokenRoundTrip() throws Exception { false); String serialized = orderByContinuationToken.toString(); ValueHolder outOrderByContinuationToken = new ValueHolder(); - + assertThat(OrderByContinuationToken.tryParse(serialized, outOrderByContinuationToken)).isTrue(); OrderByContinuationToken deserialized = outOrderByContinuationToken.v; CompositeContinuationToken compositeContinuationToken = deserialized.getCompositeContinuationToken(); @@ -320,19 +307,19 @@ public void orderByContinuationTokenRoundTrip() throws Exception { assertThat(range.getMax()).isEqualTo("D"); assertThat(range.isMinInclusive()).isEqualTo(false); assertThat(range.isMaxInclusive()).isEqualTo(true); - + QueryItem[] orderByItems = deserialized.getOrderByItems(); assertThat(orderByItems).isNotNull(); assertThat(orderByItems.length).isEqualTo(1); assertThat(orderByItems[0].getItem()).isEqualTo(42); - + String rid = deserialized.getRid(); assertThat(rid).isEqualTo("rid"); - + boolean inclusive = deserialized.getInclusive(); assertThat(inclusive).isEqualTo(false); } - + { // Negative ValueHolder outOrderByContinuationToken = new ValueHolder(); @@ -348,20 +335,20 @@ public void queryDocumentsWithOrderByContinuationTokensInteger(String sortOrder) // Get Expected Comparator order = sortOrder.equals("ASC")?Comparator.naturalOrder():Comparator.reverseOrder(); Comparator validatorComparator = Comparator.nullsFirst(order); - + List expectedResourceIds = sortDocumentsAndCollectResourceIds("propInt", d -> d.getInt("propInt"), validatorComparator); this.queryWithContinuationTokensAndPageSizes(query, new int[] { 1, 5, 10, 100}, expectedResourceIds); } - + @Test(groups = { "simple" }, timeOut = TIMEOUT * 10, dataProvider = "sortOrder") public void queryDocumentsWithOrderByContinuationTokensString(String sortOrder) throws Exception { // Get Actual String query = String.format("SELECT * FROM c ORDER BY c.id %s", sortOrder); - + // Get Expected Comparator order = sortOrder.equals("ASC")?Comparator.naturalOrder():Comparator.reverseOrder(); Comparator validatorComparator = Comparator.nullsFirst(order); - + List expectedResourceIds = sortDocumentsAndCollectResourceIds("id", d -> d.getString("id"), validatorComparator); this.queryWithContinuationTokensAndPageSizes(query, new int[] { 1, 5, 10, 100 }, expectedResourceIds); } @@ -415,7 +402,6 @@ public void beforeClass() throws Exception { List> keyValuePropsList = new ArrayList<>(); Map props; - for(int i = 0; i < 30; i++) { props = new HashMap<>(); props.put("propInt", i); @@ -450,7 +436,7 @@ public void beforeClass() throws Exception { public void afterClass() { safeClose(client); } - + private void assertInvalidContinuationToken(String query, int[] pageSize, List expectedIds) { String requestContinuation = null; do { @@ -476,7 +462,7 @@ private void assertInvalidContinuationToken(String query, int[] pageSize, List expectedIds) { for (int pageSize : pageSizes) { List receivedDocuments = this.queryWithContinuationTokens(query, pageSize); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ParallelDocumentQueryTest.java index f367b9ec71cc..200b19dc5c03 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ParallelDocumentQueryTest.java @@ -36,8 +36,6 @@ import static com.azure.data.cosmos.CommonsBridgeInternal.partitionKeyRangeIdInternal; import static org.assertj.core.api.Assertions.assertThat; -//FIXME beforeClass times out inconsistently -@Ignore public class ParallelDocumentQueryTest extends TestSuiteBase { private CosmosDatabase createdDatabase; private CosmosContainer createdCollection; @@ -62,8 +60,6 @@ public Object[][] queryMetricsArgProvider() { }; } - //FIXME test times out inconsistently - @Ignore @Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") public void queryDocuments(boolean qmEnabled) { String query = "SELECT * from c where c.prop = 99"; @@ -190,11 +186,16 @@ public void crossPartitionQueryNotEnabled() { FeedOptions options = new FeedOptions(); Flux> queryObservable = createdCollection.queryItems(query, options); - FailureValidator validator = new FailureValidator.Builder() - .instanceOf(CosmosClientException.class) - .statusCode(400) + List expectedDocs = createdDocuments; + FeedResponseListValidator validator = + new FeedResponseListValidator.Builder() + .totalSize(expectedDocs.size()) + .exactlyContainsInAnyOrder(expectedDocs.stream().map(d -> d.resourceId()).collect(Collectors.toList())) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0) + .build()) .build(); - validateQueryFailure(queryObservable, validator); + validateQuerySuccess(queryObservable, validator); } @Test(groups = { "simple" }, timeOut = 2 * TIMEOUT) @@ -217,7 +218,7 @@ public void partitionKeyRangeId() { assertThat(sum).isEqualTo(createdDocuments.size()); } - + @Test(groups = { "simple" }, timeOut = TIMEOUT) public void compositeContinuationTokenRoundTrip() throws Exception { { @@ -237,14 +238,14 @@ public void compositeContinuationTokenRoundTrip() throws Exception { assertThat(range.isMinInclusive()).isEqualTo(false); assertThat(range.isMaxInclusive()).isEqualTo(true); } - + { // Negative ValueHolder outCompositeContinuationToken = new ValueHolder(); boolean succeeed = CompositeContinuationToken.tryParse("{\"property\" : \"not a valid composite continuation token\"}", outCompositeContinuationToken); assertThat(succeeed).isFalse(); } - + { // Negative - GATEWAY composite continuation token ValueHolder outCompositeContinuationToken = new ValueHolder(); @@ -257,11 +258,11 @@ public void compositeContinuationTokenRoundTrip() throws Exception { @Test(groups = { "non-emulator" }, timeOut = TIMEOUT * 10) public void queryDocumentsWithCompositeContinuationTokens() throws Exception { String query = "SELECT * FROM c"; - + // Get Expected List expectedDocs = new ArrayList<>(createdDocuments); assertThat(expectedDocs).isNotEmpty(); - + this.queryWithContinuationTokensAndPageSizes(query, new int[] {1, 10, 100}, expectedDocs); } @@ -321,7 +322,7 @@ public CosmosItemProperties createDocument(CosmosContainer cosmosContainer, int return cosmosContainer.createItem(docDefinition).block().properties(); } - + private void queryWithContinuationTokensAndPageSizes(String query, int[] pageSizes, List expectedDocs) { for (int pageSize : pageSizes) { List receivedDocuments = this.queryWithContinuationTokens(query, pageSize); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ReadFeedDocumentsTest.java index 1268614a9757..1ef8bbe1013b 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/ReadFeedDocumentsTest.java @@ -25,8 +25,6 @@ import java.util.UUID; import java.util.stream.Collectors; -//FIXME: beforeClass times out inconsistently -@Ignore public class ReadFeedDocumentsTest extends TestSuiteBase { private CosmosDatabase createdDatabase; @@ -61,18 +59,21 @@ public void readDocuments() { @Test(groups = { "simple" }, timeOut = FEED_TIMEOUT) public void readDocuments_withoutEnableCrossPartitionQuery() { + // With introduction of queryplan, crosspartition need not be enabled anymore. FeedOptions options = new FeedOptions(); options.maxItemCount(2); Flux> feedObservable = createdCollection.readAllItems(options); - FailureValidator validator = FailureValidator.builder().instanceOf(CosmosClientException.class) - .statusCode(400) - .errorMessageContains("Cross partition query is required but disabled." + - " Please set x-ms-documentdb-query-enablecrosspartition to true," + - " specify x-ms-documentdb-partitionkey," + - " or revise your query to avoid this exception.") - .build(); - validateQueryFailure(feedObservable, validator, FEED_TIMEOUT); + FeedResponseListValidator validator = new FeedResponseListValidator.Builder() + .totalSize(createdDocuments.size()) + .numberOfPagesIsGreaterThanOrEqualTo(1) + .exactlyContainsInAnyOrder(createdDocuments.stream().map(d -> d.resourceId()).collect(Collectors.toList())) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0) + .pageSizeIsLessThanOrEqualTo(options.maxItemCount()) + .build()) + .build(); + validateQuerySuccess(feedObservable, validator, FEED_TIMEOUT); } @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT, alwaysRun = true) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/SinglePartitionDocumentQueryTest.java index 892b879de541..e741f9fb239e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/SinglePartitionDocumentQueryTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/SinglePartitionDocumentQueryTest.java @@ -33,8 +33,6 @@ import static org.assertj.core.api.Assertions.assertThat; -// FIXME beforeclass times out inconsistently -@Ignore public class SinglePartitionDocumentQueryTest extends TestSuiteBase { private Database createdDatabase; @@ -52,8 +50,6 @@ public SinglePartitionDocumentQueryTest(CosmosClientBuilder clientBuilder) { super(clientBuilder); } - //FIXME test is flaky - @Ignore @Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") public void queryDocuments(boolean queryMetricsEnabled) throws Exception { @@ -87,7 +83,7 @@ public void queryDocuments_ParameterizedQueryWithInClause() throws Exception { String query = "SELECT * from c where c.prop IN (@param1, @param2)"; SqlParameterList params = new SqlParameterList(new SqlParameter("@param1", 3), new SqlParameter("@param2", 4)); SqlQuerySpec sqs = new SqlQuerySpec(query, params); - + FeedOptions options = new FeedOptions(); options.maxItemCount(5); options.enableCrossPartitionQuery(true); @@ -210,20 +206,20 @@ public void continuationToken() throws Exception { options.enableCrossPartitionQuery(true); options.maxItemCount(3); Flux> queryObservable = createdCollection.queryItems(query, options); - + TestSubscriber> subscriber = new TestSubscriber<>(); queryObservable.take(1).subscribe(subscriber); - + subscriber.awaitTerminalEvent(); subscriber.assertComplete(); subscriber.assertNoErrors(); assertThat(subscriber.valueCount()).isEqualTo(1); FeedResponse page = ((FeedResponse) subscriber.getEvents().get(0).get(0)); assertThat(page.results()).hasSize(3); - + assertThat(page.continuationToken()).isNotEmpty(); - - + + options.requestContinuation(page.continuationToken()); queryObservable = createdCollection.queryItems(query, options); @@ -231,7 +227,7 @@ public void continuationToken() throws Exception { int expectedPageSize = (expectedDocs.size() + options.maxItemCount() - 1) / options.maxItemCount(); assertThat(expectedDocs).hasSize(createdDocuments.size() -3); - + FeedResponseListValidator validator = new FeedResponseListValidator.Builder() .containsExactly(expectedDocs.stream() .sorted((e1, e2) -> Integer.compare(e1.getInt("prop"), e2.getInt("prop"))) @@ -242,7 +238,7 @@ public void continuationToken() throws Exception { .build(); validateQuerySuccess(queryObservable, validator); } - + @Test(groups = { "simple" }, timeOut = TIMEOUT) public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/TopQueryTests.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/TopQueryTests.java index 6e862ceb8bbe..fb0fef06485b 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/TopQueryTests.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/TopQueryTests.java @@ -46,8 +46,6 @@ public TopQueryTests(CosmosClientBuilder clientBuilder) { super(clientBuilder); } - //FIXME: line 82 Assertion error. Error message "Expecting <0> to be greater than <0>" : FastIntegrationTests (Builds: SingeRegionSessionTcp) - @Ignore @Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider", retryAnalyzer = RetryAnalyzer.class) public void queryDocumentsWithTop(boolean qmEnabled) throws Exception { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/UserQueryTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/UserQueryTest.java index 163d1a11c3be..069b6f1d29a6 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/UserQueryTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/UserQueryTest.java @@ -41,8 +41,7 @@ public UserQueryTest(CosmosClientBuilder clientBuilder) { super(clientBuilder); } - //FIXME test times out inconsistently - @Ignore + @Test(groups = { "simple" }, timeOut = TIMEOUT) public void queryUsersWithFilter() throws Exception {