Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieving query info using new query plan retriever #111

Merged
merged 17 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class FeedOptions extends FeedOptionsBase {
private int maxDegreeOfParallelism;
private int maxBufferedItemCount;
private int responseContinuationTokenLimitInKb;
private boolean allowEmptyPages;

public FeedOptions() {}

Expand All @@ -48,6 +49,7 @@ public FeedOptions(FeedOptions options) {
this.maxDegreeOfParallelism = options.maxDegreeOfParallelism;
this.maxBufferedItemCount = options.maxBufferedItemCount;
this.responseContinuationTokenLimitInKb = options.responseContinuationTokenLimitInKb;
this.allowEmptyPages = options.allowEmptyPages;
}

/**
Expand Down Expand Up @@ -229,4 +231,19 @@ public void setResponseContinuationTokenLimitInKb(int limitInKb) {
public int getResponseContinuationTokenLimitInKb() {
return responseContinuationTokenLimitInKb;
}

/**
* Gets the option to allow empty result pages in feed response.
*/
public boolean getAllowEmptyPages() {
return allowEmptyPages;
}

/**
* Sets the option to allow empty result pages in feed response.
* @param allowEmptyPages
*/
public void setAllowEmptyPages(boolean allowEmptyPages) {
this.allowEmptyPages = allowEmptyPages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public static class HttpHeaders {
public static final String IS_QUERY = "x-ms-documentdb-isquery";
public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition";
public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery";
public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request";
public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features";
public static final String QUERY_VERSION = "x-ms-cosmos-query-version";

// Our custom DocDB headers
public static final String CONTINUATION = "x-ms-continuation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum OperationType {
Replace,
Resume,
SqlQuery,
QueryPlan,
Stop,
Throttle,
Update,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ public static boolean isFeedRequest(OperationType requestOperationType) {
requestOperationType == OperationType.ReadFeed ||
requestOperationType == OperationType.Query ||
requestOperationType == OperationType.SqlQuery ||
requestOperationType == OperationType.QueryPlan ||
requestOperationType == OperationType.HeadFeed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Used internally to encapsulates a query's information in the Azure Cosmos DB database service.
*/
public final class QueryInfo extends JsonSerializable {
private static final String HAS_SELECT_VALUE = "hasSelectValue";
private Integer top;
private List<SortOrder> orderBy;
private Collection<AggregateOperator> aggregates;
Expand Down Expand Up @@ -89,4 +90,8 @@ public Collection<String> getOrderByExpressions() {
? this.orderByExpressions
: (this.orderByExpressions = super.getCollection("orderByExpressions", String.class));
}

public boolean hasSelectValue(){
return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ private Observable<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest
}

private Observable<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
if(request.getOperationType() != OperationType.QueryPlan) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
}

switch (this.queryCompatibilityMode) {
case SqlQuery:
Expand Down Expand Up @@ -462,7 +464,8 @@ private Observable<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServ
case Replace:
return this.replace(request);
case SqlQuery:
case Query:
case Query:
case QueryPlan:
return this.query(request);
default:
throw new IllegalStateException("Unknown operation type " + request.getOperationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part
}

if (this.resourceTypeEnum.isPartitioned()) {
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId()));
boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null;
if(!hasPartitionKey){
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@
import java.util.List;
import java.util.UUID;

import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.Resource;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.internal.OperationType;
import com.microsoft.azure.cosmosdb.internal.ResourceType;
import com.microsoft.azure.cosmosdb.internal.query.PartitionedQueryExecutionInfo;
import com.microsoft.azure.cosmosdb.internal.query.QueryInfo;
import com.microsoft.azure.cosmosdb.rx.internal.BadRequestException;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import com.microsoft.azure.cosmosdb.rx.internal.Strings;
import com.microsoft.azure.cosmosdb.rx.internal.Utils;
import com.microsoft.azure.cosmosdb.rx.internal.caches.RxCollectionCache;

Expand Down Expand Up @@ -74,31 +77,68 @@ public static <T extends Resource> Observable<? extends IDocumentQueryExecutionC
boolean isContinuationExpected,
UUID correlatedActivityId) {

// return proxy
Observable<DocumentCollection> collectionObs = Observable.just(null);

if (resourceTypeEnum.isCollectionChild()) {
collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).toObservable();
}

// We create a ProxyDocumentQueryExecutionContext that will be initialized with DefaultDocumentQueryExecutionContext
// which will be used to send the query to Gateway and on getting 400(bad request) with 1004(cross parition query not servable), we initialize it with
// PipelinedDocumentQueryExecutionContext by providing the partition query execution info that's needed(which we get from the exception returned from Gateway).

Observable<ProxyDocumentQueryExecutionContext<T>> proxyQueryExecutionContext =
collectionObs.flatMap(collection ->
ProxyDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
collection,
isContinuationExpected,
correlatedActivityId));

return proxyQueryExecutionContext;
DefaultDocumentQueryExecutionContext<T> queryExecutionContext = new DefaultDocumentQueryExecutionContext<T>(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
correlatedActivityId,
isContinuationExpected);

if (ResourceType.Document != resourceTypeEnum
|| (feedOptions != null && feedOptions.getPartitionKeyRangeIdInternal() != null)) {
return Observable.just(queryExecutionContext);
}

Single<PartitionedQueryExecutionInfo> queryExecutionInfoSingle =
QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink);

return collectionObs.flatMap(collection -> queryExecutionInfoSingle.toObservable()
.flatMap(partitionedQueryExecutionInfo -> {
QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();

// Non value aggregates must go through DefaultDocumentQueryExecutionContext
// Single partition query can serve queries like SELECT AVG(c.age) FROM c
// SELECT MIN(c.age) + 5 FROM c
// SELECT MIN(c.age), MAX(c.age) FROM c
// while pipelined queries can only serve
// SELECT VALUE <AGGREGATE>. So we send the query down the old pipeline to avoid a breaking change.
// We will skip this in V3 SDK
if(queryInfo.hasAggregates() && !queryInfo.hasSelectValue()){
if(feedOptions != null && feedOptions.getEnableCrossPartitionQuery()){
return Observable.error(new DocumentClientException(HttpConstants.StatusCodes.BADREQUEST,
"Cross partition query only supports 'VALUE <AggreateFunc>' for aggregates"));
}
return Observable.just( queryExecutionContext);
}

Single<List<PartitionKeyRange>> partitionKeyRanges = queryExecutionContext
.getTargetPartitionKeyRanges(collection.getResourceId(), partitionedQueryExecutionInfo.getQueryRanges());

Observable<IDocumentQueryExecutionContext<T>> exContext = partitionKeyRanges
.toObservable()
.flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
isContinuationExpected,
partitionedQueryExecutionInfo,
pkranges,
collection.getResourceId(),
correlatedActivityId));

return exContext;
}));
}

public static <T extends Resource> Observable<? extends IDocumentQueryExecutionContext<T>> createSpecializedDocumentQueryExecutionContextAsync(
Expand All @@ -114,7 +154,12 @@ public static <T extends Resource> Observable<? extends IDocumentQueryExecutionC
String collectionRid,
UUID correlatedActivityId) {

int initialPageSize = Utils.getValueOrDefault(feedOptions.getMaxItemCount(), ParallelQueryConfig.ClientInternalPageSize);
if (feedOptions == null) {
feedOptions = new FeedOptions();
}

int initialPageSize = Utils.getValueOrDefault(feedOptions.getMaxItemCount(),
ParallelQueryConfig.ClientInternalPageSize);

BadRequestException validationError = Utils.checkRequestOrReturnException
(initialPageSize > 0, "MaxItemCount", "Invalid MaxItemCount %s", initialPageSize);
Expand All @@ -123,6 +168,10 @@ public static <T extends Resource> Observable<? extends IDocumentQueryExecutionC
}

QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();

if (!Strings.isNullOrEmpty(queryInfo.getRewrittenQuery())) {
query = new SqlQuerySpec(queryInfo.getRewrittenQuery(), query.getParameters());
}

boolean getLazyFeedResponse = queryInfo.hasTop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
*/
public class ParallelDocumentQueryExecutionContext<T extends Resource>
extends ParallelDocumentQueryExecutionContextBase<T> {

FeedOptions feedOptions;

private ParallelDocumentQueryExecutionContext(
IDocumentQueryClient client,
List<PartitionKeyRange> partitionKeyRanges,
Expand All @@ -75,6 +76,7 @@ private ParallelDocumentQueryExecutionContext(
UUID correlatedActivityId) {
super(client, partitionKeyRanges, resourceTypeEnum, resourceType, query, feedOptions, resourceLink,
rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId);
this.feedOptions = feedOptions;
}

public static <T extends Resource> Observable<IDocumentQueryExecutionComponent<T>> createAsync(
Expand Down Expand Up @@ -194,16 +196,17 @@ private static class EmptyPagesFilterTransformer<T extends Resource>
implements Transformer<DocumentProducer<T>.DocumentProducerFeedResponse, FeedResponse<T>> {
private final RequestChargeTracker tracker;
private DocumentProducer<T>.DocumentProducerFeedResponse previousPage;
private final FeedOptions feedOptions;

public EmptyPagesFilterTransformer(
RequestChargeTracker tracker) {
public EmptyPagesFilterTransformer(RequestChargeTracker tracker, FeedOptions feedOptions) {

if (tracker == null) {
throw new IllegalArgumentException("Request Charge Tracker must not be null.");
}

this.tracker = tracker;
this.previousPage = null;
this.feedOptions = feedOptions;
}

private DocumentProducer<T>.DocumentProducerFeedResponse plusCharge(
Expand Down Expand Up @@ -246,7 +249,7 @@ private static Map<String, String> headerResponse(
public Observable<FeedResponse<T>> call(
Observable<DocumentProducer<T>.DocumentProducerFeedResponse> source) {
return source.filter(documentProducerFeedResponse -> {
if (documentProducerFeedResponse.pageResult.getResults().isEmpty()) {
if (documentProducerFeedResponse.pageResult.getResults().isEmpty() && !this.feedOptions.getAllowEmptyPages()) {
// filter empty pages and accumulate charge
tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
return false;
Expand Down Expand Up @@ -306,7 +309,6 @@ public Observable<FeedResponse<T>> call(
page = current;
page = this.addCompositeContinuationToken(page,
compositeContinuationToken);

return page;
}).map(documentProducerFeedResponse -> {
// Unwrap the documentProducerFeedResponse and get back the feedResponse
Expand All @@ -333,7 +335,8 @@ public Observable<FeedResponse<T>> drainAsync(
.map(dp -> dp.produceAsync())
// Merge results from all partitions.
.collect(Collectors.toList());
return Observable.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker()));
return Observable.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(),
this.feedOptions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,22 @@ protected void initialize(String collectionRid,
Map<String, String> commonRequestHeaders = createCommonHeadersAsync(this.getFeedOptions(null, null));

for (PartitionKeyRange targetRange : partitionKeyRangeToContinuationTokenMap.keySet()) {
// If partitionkey is supplied, then we dont need to run through all ranges as we only add
// x-ms-documentdb-partitionkey header and skip the partitionkeyrangeid id header
if(feedOptions.getPartitionKey() != null && Integer.parseInt(targetRange.getId()) > 0){
continue;
}
Func3<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc = (partitionKeyRange,
continuationToken, pageSize) -> {
Map<String, String> headers = new HashMap<>(commonRequestHeaders);
headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken);
headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize));
if(feedOptions.getPartitionKey() != null){
headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, feedOptions
.getPartitionKey()
.getInternalPartitionKey()
.toJson());
}
return this.createDocumentServiceRequest(headers, querySpecForInit, partitionKeyRange, collectionRid);
};

Expand Down
Loading