Skip to content

Commit

Permalink
Enabling ignored query tests. Ported Queryplan and OffsetLimit (#5365)
Browse files Browse the repository at this point in the history
* Enabling ignored query tests.

* Porting query plan from v2

* Fixing failing tests
Disabling non value aggregates

* Porting offset limit support from V2
Adding test to drain all the documents (#257)

* Fixing failing tests

* Fixing failing tests
  • Loading branch information
mbhaskar authored Nov 6, 2019
1 parent 769753d commit dc68713
Show file tree
Hide file tree
Showing 32 changed files with 895 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class FeedOptions {
private PartitionKey partitionkey;
private boolean populateQueryMetrics;
private Map<String, Object> properties;
private boolean allowEmptyPages;

public FeedOptions() {
}
Expand All @@ -40,6 +41,7 @@ public FeedOptions(FeedOptions options) {
this.requestContinuation = options.requestContinuation;
this.partitionkey = options.partitionkey;
this.populateQueryMetrics = options.populateQueryMetrics;
this.allowEmptyPages = options.allowEmptyPages;
}

/**
Expand Down Expand Up @@ -338,4 +340,19 @@ public FeedOptions properties(Map<String, Object> properties) {
this.properties = properties;
return this;
}

/**
* Gets the option to allow empty result pages in feed response.
*/
public boolean allowEmptyPages() {
return allowEmptyPages;
}

/**
* Sets the option to allow empty result pages in feed response. Defaults to false
* @param allowEmptyPages whether to allow empty pages in feed response
*/
public void allowEmptyPages(boolean allowEmptyPages) {
this.allowEmptyPages = allowEmptyPages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public static class HttpHeaders {
public static final String IS_QUERY = "x-ms-documentdb-isquery";
public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition";
public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery";
public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request";
public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features";
public static final String QUERY_VERSION = "x-ms-cosmos-query-version";

// Our custom DocDB headers
public static final String CONTINUATION = "x-ms-continuation";
Expand Down Expand Up @@ -248,6 +251,7 @@ public static class A_IMHeaderValues {

public static class Versions {
public static final String CURRENT_VERSION = "2018-12-31";
public static final String QUERY_VERSION = "1.0";

// TODO: FIXME we can use maven plugin for generating a version file
// @see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum OperationType {
Replace,
Resume,
SqlQuery,
QueryPlan,
Stop,
Throttle,
Update,
Expand All @@ -50,4 +51,4 @@ public boolean isWriteOperation() {
this == Upsert ||
this == Update;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ private Flux<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest reques
}

private Flux<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
if(request.getOperationType() != OperationType.QueryPlan) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
}

switch (this.queryCompatibilityMode) {
case SqlQuery:
Expand Down Expand Up @@ -352,6 +354,7 @@ private Flux<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServiceReq
return this.replace(request);
case SqlQuery:
case Query:
case QueryPlan:
return this.query(request);
default:
throw new IllegalStateException("Unknown operation type " + request.getOperationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,22 +499,22 @@ public static <O, I> O as(I i, Class<O> klass) {
return null;
}
}

@SuppressWarnings("unchecked")
public static <V> List<V> immutableListOf() {
return Collections.EMPTY_LIST;
}

public static <V> List<V> immutableListOf(V v1) {
List<V> list = new ArrayList<>();
list.add(v1);
return Collections.unmodifiableList(list);
}

public static <K, V> Map<K, V>immutableMapOf() {
return Collections.emptyMap();
}

public static <K, V> Map<K, V>immutableMapOf(K k1, V v1) {
Map<K, V> map = new HashMap<K ,V>();
map.put(k1, v1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.azure.data.cosmos.internal.routing.PartitionKeyInternal;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import com.azure.data.cosmos.internal.routing.Range;
import com.azure.data.cosmos.internal.routing.RoutingMapProviderHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -85,16 +87,16 @@ public Flux<FeedResponse<T>> executeAsync() {
if (feedOptions == null) {
feedOptions = new FeedOptions();
}

FeedOptions newFeedOptions = new FeedOptions(feedOptions);

// We can not go to backend with the composite continuation token,
// but we still need the gateway for the query plan.
// The workaround is to try and parse the continuation token as a composite continuation token.
// If it is, then we send the query to the gateway with max degree of parallelism to force getting back the query plan

String originalContinuation = newFeedOptions.requestContinuation();

if (isClientSideContinuationToken(originalContinuation)) {
// At this point we know we want back a query plan
newFeedOptions.requestContinuation(null);
Expand All @@ -115,8 +117,14 @@ public Flux<FeedResponse<T>> executeAsync() {
public Mono<List<PartitionKeyRange>> getTargetPartitionKeyRanges(String resourceId, List<Range<String>> queryRanges) {
// TODO: FIXME this needs to be revisited

Range<String> r = new Range<>("", "FF", true, false);
return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null);
return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges);
}

public Mono<List<PartitionKeyRange>> getTargetPartitionKeyRangesById(String resourceId, String partitionKeyRangeIdInternal) {
return client.getPartitionKeyRangeCache().tryGetPartitionKeyRangeByIdAsync(resourceId,
partitionKeyRangeIdInternal,
false,
null).flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange)));
}

protected Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeInternalAsyncFunc() {
Expand Down Expand Up @@ -222,7 +230,7 @@ public RxDocumentServiceRequest createRequestAsync(String continuationToken, Int

return request;
}

private static boolean isClientSideContinuationToken(String continuationToken) {
if (continuationToken != null) {
ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new ValueHolder<CompositeContinuationToken>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part
}

if (this.resourceTypeEnum.isPartitioned()) {
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.id()));
boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null;
if(!hasPartitionKey){
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.id()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

import com.azure.data.cosmos.BadRequestException;
import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.CommonsBridgeInternal;
import com.azure.data.cosmos.internal.DocumentCollection;
import com.azure.data.cosmos.FeedOptions;
import com.azure.data.cosmos.PartitionKey;
import com.azure.data.cosmos.Resource;
import com.azure.data.cosmos.SqlQuerySpec;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.OperationType;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.ResourceType;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.Strings;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.caches.RxCollectionCache;
import com.azure.data.cosmos.internal.routing.PartitionKeyInternal;
import com.azure.data.cosmos.internal.routing.Range;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -60,37 +66,82 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).flux();
}

// We create a ProxyDocumentQueryExecutionContext that will be initialized with DefaultDocumentQueryExecutionContext
// which will be used to send the query to GATEWAY and on getting 400(bad request) with 1004(cross parition query not servable), we initialize it with
// PipelinedDocumentQueryExecutionContext by providing the partition query execution info that's needed(which we get from the exception returned from GATEWAY).

Flux<ProxyDocumentQueryExecutionContext<T>> proxyQueryExecutionContext =
collectionObs.flatMap(collection -> {
if (feedOptions != null && feedOptions.partitionKey() != null && feedOptions.partitionKey().equals(PartitionKey.None)) {
feedOptions.partitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collection.getPartitionKey())));
}
return ProxyDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
collection,
isContinuationExpected,
correlatedActivityId);
}).switchIfEmpty(ProxyDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
null,
isContinuationExpected,
correlatedActivityId));

return proxyQueryExecutionContext;
DefaultDocumentQueryExecutionContext<T> queryExecutionContext = new DefaultDocumentQueryExecutionContext<T>(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
correlatedActivityId,
isContinuationExpected);

if (ResourceType.Document != resourceTypeEnum) {
return Flux.just(queryExecutionContext);
}

Mono<PartitionedQueryExecutionInfo> queryExecutionInfoMono =
com.azure.data.cosmos.internal.query.QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink);

return collectionObs.single().flatMap(collection ->
queryExecutionInfoMono.flatMap(partitionedQueryExecutionInfo -> {
QueryInfo queryInfo =
partitionedQueryExecutionInfo.getQueryInfo();
// Non value aggregates must go through
// DefaultDocumentQueryExecutionContext
// Single partition query can serve queries like SELECT AVG(c
// .age) FROM c
// SELECT MIN(c.age) + 5 FROM c
// SELECT MIN(c.age), MAX(c.age) FROM c
// while pipelined queries can only serve
// SELECT VALUE <AGGREGATE>. So we send the query down the old
// pipeline to avoid a breaking change.
// Should be fixed by adding support for nonvalueaggregates
if (queryInfo.hasAggregates() && !queryInfo.hasSelectValue()) {
if (feedOptions != null && feedOptions.enableCrossPartitionQuery()) {
return Mono.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.BADREQUEST,
"Cross partition query only supports 'VALUE " +
"<AggreateFunc>' for aggregates"));
}
return Mono.just(queryExecutionContext);
}

Mono<List<PartitionKeyRange>> partitionKeyRanges;
// The partitionKeyRangeIdInternal is no more a public API on FeedOptions, but have the below condition
// for handling ParallelDocumentQueryTest#partitionKeyRangeId
if (feedOptions != null && !StringUtils.isEmpty(CommonsBridgeInternal.partitionKeyRangeIdInternal(feedOptions))) {
partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRangesById(collection.resourceId(),
CommonsBridgeInternal.partitionKeyRangeIdInternal(feedOptions));
} else {
List<Range<String>> queryRanges =
partitionedQueryExecutionInfo.getQueryRanges();

if (feedOptions != null && feedOptions.partitionKey() != null) {
PartitionKeyInternal internalPartitionKey =
feedOptions.partitionKey()
.getInternalPartitionKey();
Range<String> range = Range.getPointRange(internalPartitionKey
.getEffectivePartitionKeyString(internalPartitionKey,
collection.getPartitionKey()));
queryRanges = Collections.singletonList(range);
}
partitionKeyRanges = queryExecutionContext
.getTargetPartitionKeyRanges(collection.resourceId(), queryRanges);
}
return partitionKeyRanges
.flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
isContinuationExpected,
partitionedQueryExecutionInfo,
pkranges,
collection.resourceId(),
correlatedActivityId).single());

})).flux();
}

public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext<T>> createSpecializedDocumentQueryExecutionContextAsync(
Expand All @@ -106,7 +157,12 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
String collectionRid,
UUID correlatedActivityId) {

int initialPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(), ParallelQueryConfig.ClientInternalPageSize);
if (feedOptions == null) {
feedOptions = new FeedOptions();
}

int initialPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(),
ParallelQueryConfig.ClientInternalPageSize);

BadRequestException validationError = Utils.checkRequestOrReturnException(
initialPageSize > 0 || initialPageSize == -1, "MaxItemCount", "Invalid MaxItemCount %s", initialPageSize);
Expand All @@ -116,6 +172,10 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext

QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();

if (!Strings.isNullOrEmpty(queryInfo.getRewrittenQuery())) {
query = new SqlQuerySpec(queryInfo.getRewrittenQuery(), query.parameters());
}

boolean getLazyFeedResponse = queryInfo.hasTop();

// We need to compute the optimal initial page size for order-by queries
Expand Down
Loading

0 comments on commit dc68713

Please sign in to comment.