Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieving query info using new query plan retriever #111

Merged
merged 17 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public static class HttpHeaders {
public static final String IS_QUERY = "x-ms-documentdb-isquery";
public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition";
public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery";
public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request";
public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features";
public static final String QUERY_VERSION = "x-ms-cosmos-query-version";

// Our custom DocDB headers
public static final String CONTINUATION = "x-ms-continuation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum OperationType {
Replace,
Resume,
SqlQuery,
QueryPlan,
Stop,
Throttle,
Update,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ public static boolean isFeedRequest(OperationType requestOperationType) {
requestOperationType == OperationType.ReadFeed ||
requestOperationType == OperationType.Query ||
requestOperationType == OperationType.SqlQuery ||
requestOperationType == OperationType.QueryPlan ||
requestOperationType == OperationType.HeadFeed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Used internally to encapsulates a query's information in the Azure Cosmos DB database service.
*/
public final class QueryInfo extends JsonSerializable {
private static final String HAS_SELECT_VALUE = "hasSelectValue";
private Integer top;
private List<SortOrder> orderBy;
private Collection<AggregateOperator> aggregates;
Expand Down Expand Up @@ -89,4 +90,8 @@ public Collection<String> getOrderByExpressions() {
? this.orderByExpressions
: (this.orderByExpressions = super.getCollection("orderByExpressions", String.class));
}

public boolean hasSelectValue(){
return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ private Observable<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest
}

private Observable<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
if(request.getOperationType() != OperationType.QueryPlan) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
}

switch (this.queryCompatibilityMode) {
case SqlQuery:
Expand Down Expand Up @@ -462,7 +464,8 @@ private Observable<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServ
case Replace:
return this.replace(request);
case SqlQuery:
case Query:
case Query:
case QueryPlan:
return this.query(request);
default:
throw new IllegalStateException("Unknown operation type " + request.getOperationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.microsoft.azure.cosmosdb.internal.query.QueryInfo;
import com.microsoft.azure.cosmosdb.rx.internal.BadRequestException;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import com.microsoft.azure.cosmosdb.rx.internal.Strings;
import com.microsoft.azure.cosmosdb.rx.internal.Utils;
import com.microsoft.azure.cosmosdb.rx.internal.caches.RxCollectionCache;

Expand Down Expand Up @@ -74,31 +75,52 @@ public static <T extends Resource> Observable<? extends IDocumentQueryExecutionC
boolean isContinuationExpected,
UUID correlatedActivityId) {

// return proxy
Observable<DocumentCollection> collectionObs = Observable.just(null);

if (resourceTypeEnum.isCollectionChild()) {
collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).toObservable();
}

// We create a ProxyDocumentQueryExecutionContext that will be initialized with DefaultDocumentQueryExecutionContext
// which will be used to send the query to Gateway and on getting 400(bad request) with 1004(cross parition query not servable), we initialize it with
// PipelinedDocumentQueryExecutionContext by providing the partition query execution info that's needed(which we get from the exception returned from Gateway).

Observable<ProxyDocumentQueryExecutionContext<T>> proxyQueryExecutionContext =
collectionObs.flatMap(collection ->
ProxyDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
collection,
isContinuationExpected,
correlatedActivityId));

return proxyQueryExecutionContext;
DefaultDocumentQueryExecutionContext<T> queryExecutionContext = new DefaultDocumentQueryExecutionContext<T>(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
correlatedActivityId,
isContinuationExpected);

if (ResourceType.Document != resourceTypeEnum
|| (feedOptions != null && feedOptions.getPartitionKey() != null)
|| (feedOptions != null && feedOptions.getPartitionKeyRangeIdInternal() != null)) {
return Observable.just(queryExecutionContext);
}

Single<PartitionedQueryExecutionInfo> queryExecutionInfoSingle =
QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink);

return collectionObs.flatMap(collection -> queryExecutionInfoSingle.toObservable()
.flatMap(partitionedQueryExecutionInfo -> {
Single<List<PartitionKeyRange>> partitionKeyRanges = queryExecutionContext
.getTargetPartitionKeyRanges(collection.getResourceId(), partitionedQueryExecutionInfo.getQueryRanges());

Observable<IDocumentQueryExecutionContext<T>> exContext = partitionKeyRanges
.toObservable()
.flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
isContinuationExpected,
partitionedQueryExecutionInfo,
pkranges,
collection.getResourceId(),
correlatedActivityId));

return exContext;
}));
}

public static <T extends Resource> Observable<? extends IDocumentQueryExecutionContext<T>> createSpecializedDocumentQueryExecutionContextAsync(
Expand All @@ -114,6 +136,10 @@ public static <T extends Resource> Observable<? extends IDocumentQueryExecutionC
String collectionRid,
UUID correlatedActivityId) {

if(feedOptions == null){
feedOptions = new FeedOptions();
}

int initialPageSize = Utils.getValueOrDefault(feedOptions.getMaxItemCount(), ParallelQueryConfig.ClientInternalPageSize);

BadRequestException validationError = Utils.checkRequestOrReturnException
Expand All @@ -124,6 +150,23 @@ public static <T extends Resource> Observable<? extends IDocumentQueryExecutionC

QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();

// non value aggregates must go through DefaultDocumentQueryExecutionContext
if(queryInfo.hasAggregates() && !queryInfo.hasSelectValue()){
return Observable.just( new DefaultDocumentQueryExecutionContext<T>(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
correlatedActivityId,
isContinuationExpected));
}

if (!Strings.isNullOrEmpty(queryInfo.getRewrittenQuery())) {
query = new SqlQuerySpec(queryInfo.getRewrittenQuery(), query.getParameters());
}

boolean getLazyFeedResponse = queryInfo.hasTop();

// We need to compute the optimal initial page size for order-by queries
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.microsoft.azure.cosmosdb.rx.internal.query;

enum QueryFeature {
None,
Aggregate,
CompositeAggregate,
Distinct,
GroupBy,
MultipleAggregates,
MultipleOrderBy,
OffsetAndLimit,
OrderBy,
Top
}
Loading