Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieving query info using new query plan retriever #111

Merged
merged 17 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,10 @@ public void validate(List<FeedResponse<T>> feedList) {
assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0);
assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0);
assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getQueryPreparationTimes().getPhysicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getQueryPreparationTimes().getQueryCompilationTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getRuntimeExecutionTimes().getQueryEngineExecutionTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class FeedOptions extends FeedOptionsBase {
private int maxDegreeOfParallelism;
private int maxBufferedItemCount;
private int responseContinuationTokenLimitInKb;
private boolean allowEmptyPages;

public FeedOptions() {}

Expand All @@ -48,6 +49,7 @@ public FeedOptions(FeedOptions options) {
this.maxDegreeOfParallelism = options.maxDegreeOfParallelism;
this.maxBufferedItemCount = options.maxBufferedItemCount;
this.responseContinuationTokenLimitInKb = options.responseContinuationTokenLimitInKb;
this.allowEmptyPages = options.allowEmptyPages;
}

/**
Expand Down Expand Up @@ -229,4 +231,19 @@ public void setResponseContinuationTokenLimitInKb(int limitInKb) {
public int getResponseContinuationTokenLimitInKb() {
return responseContinuationTokenLimitInKb;
}

/**
* Gets the option to allow empty result pages in feed response.
*/
public boolean getAllowEmptyPages() {
return allowEmptyPages;
}

/**
* Sets the option to allow empty result pages in feed response. Defaults to false
* @param allowEmptyPages whether to allow empty pages in feed response
*/
public void setAllowEmptyPages(boolean allowEmptyPages) {
this.allowEmptyPages = allowEmptyPages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public static class HttpHeaders {
public static final String IS_QUERY = "x-ms-documentdb-isquery";
public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition";
public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery";
public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request";
public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features";
public static final String QUERY_VERSION = "x-ms-cosmos-query-version";

// Our custom DocDB headers
public static final String CONTINUATION = "x-ms-continuation";
Expand Down Expand Up @@ -274,6 +277,7 @@ public static class Versions {
// https://stackoverflow.com/questions/2469922/generate-a-version-java-file-in-maven
public static final String SDK_VERSION = "2.5.1";
public static final String SDK_NAME = "cosmosdb-java-sdk";
public static final String QUERY_VERSION = "1.0";
}

public static class StatusCodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum OperationType {
Replace,
Resume,
SqlQuery,
QueryPlan,
Stop,
Throttle,
Update,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ public static boolean isFeedRequest(OperationType requestOperationType) {
requestOperationType == OperationType.ReadFeed ||
requestOperationType == OperationType.Query ||
requestOperationType == OperationType.SqlQuery ||
requestOperationType == OperationType.QueryPlan ||
requestOperationType == OperationType.HeadFeed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void writeIndexHitRatio(double indexHitRatio) {

@Override
protected void writeTotalQueryExecutionTime(Duration totalQueryExecutionTime) {
QueryMetricsTextWriter.appendNanosecondsToStringBuilder(stringBuilder,
QueryMetricsTextWriter.appendMillisecondsToStringBuilder(stringBuilder,
QueryMetricsTextWriter.TotalQueryExecutionTime, durationToMilliseconds(totalQueryExecutionTime), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.Undefined;
import com.microsoft.azure.cosmosdb.internal.Utils;
import com.microsoft.azure.cosmosdb.rx.internal.RMResources;
Expand Down Expand Up @@ -222,6 +223,10 @@ public List<IPartitionKeyComponent> getComponents() {
return components;
}

public String getEffectivePartitionKeyString(PartitionKeyInternal internalPartitionKey, PartitionKeyDefinition partitionKey) {
return PartitionKeyInternalHelper.getEffectivePartitionKeyString(internalPartitionKey, partitionKey);
}

@SuppressWarnings("serial")
static final class PartitionKeyInternalJsonSerializer extends StdSerializer<PartitionKeyInternal> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Used internally to encapsulates a query's information in the Azure Cosmos DB database service.
*/
public final class QueryInfo extends JsonSerializable {
private static final String HAS_SELECT_VALUE = "hasSelectValue";
private Integer top;
private List<SortOrder> orderBy;
private Collection<AggregateOperator> aggregates;
Expand Down Expand Up @@ -89,4 +90,8 @@ public Collection<String> getOrderByExpressions() {
? this.orderByExpressions
: (this.orderByExpressions = super.getCollection("orderByExpressions", String.class));
}

public boolean hasSelectValue(){
return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/*
* The MIT License (MIT)
* Copyright (c) 2018 Microsoft Corporation
*
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand All @@ -26,8 +26,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;

import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.rx.internal.IRoutingMapProvider;
import rx.Observable;
import rx.Single;

/**
* Provide utility functionality to route request in direct connectivity mode in the Azure Cosmos DB database service.
Expand All @@ -39,7 +43,7 @@ private static String max(String left, String right) {
return left.compareTo(right) < 0 ? right : left;
}

private static <T extends Comparable<T>> boolean IsSortedAndNonOverlapping(List<Range<T>> list) {
private static <T extends Comparable<T>> boolean isSortedAndNonOverlapping(List<Range<T>> list) {
for (int i = 1; i < list.size(); i++) {
Range<T> previousRange = list.get(i - 1);
Range<T> currentRange = list.get(i);
Expand All @@ -57,7 +61,7 @@ private static <T extends Comparable<T>> boolean IsSortedAndNonOverlapping(List<

public static Collection<PartitionKeyRange> getOverlappingRanges(RoutingMapProvider routingMapProvider,
String collectionSelfLink, List<Range<String>> sortedRanges) {
if (!IsSortedAndNonOverlapping(sortedRanges)) {
if (!isSortedAndNonOverlapping(sortedRanges)) {
throw new IllegalArgumentException("sortedRanges");
}

Expand Down Expand Up @@ -94,4 +98,65 @@ public static Collection<PartitionKeyRange> getOverlappingRanges(RoutingMapProvi

return targetRanges;
}

public static Single<List<PartitionKeyRange>> getOverlappingRanges(IRoutingMapProvider routingMapProvider,
String resourceId, List<Range<String>> sortedRanges) {

if (routingMapProvider == null){
throw new IllegalArgumentException("routingMapProvider");
}

if (sortedRanges == null) {
throw new IllegalArgumentException("sortedRanges");
}

if (!isSortedAndNonOverlapping(sortedRanges)) {
throw new IllegalArgumentException("sortedRanges");
}

List<PartitionKeyRange> targetRanges = new ArrayList<>();
final ListIterator<Range<String>> iterator = sortedRanges.listIterator();

return Observable.defer(() -> {
if (!iterator.hasNext()) {
return Observable.empty();
}

Range<String> queryRange;
Range<String> sortedRange = iterator.next();
if (!targetRanges.isEmpty()) {
String left = max(targetRanges.get(targetRanges.size() - 1).getMaxExclusive(),
sortedRange.getMin());

boolean leftInclusive = left.compareTo(sortedRange.getMin()) == 0 && sortedRange.isMinInclusive();

queryRange = new Range<String>(left, sortedRange.getMax(), leftInclusive,
sortedRange.isMaxInclusive());
} else {
queryRange = sortedRange;
}

return routingMapProvider.tryGetOverlappingRangesAsync(resourceId, queryRange, false, null)
.map(targetRanges::addAll)
.flatMap(aBoolean -> {
if (!targetRanges.isEmpty()) {
Range<String> lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange();
while (iterator.hasNext()) {
Range<String> value = iterator.next();
if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) {
// Since we already moved forward on iterator to check above condition, we
// go to previous when it fails so the the value is not skipped on iteration
iterator.previous();
break;
}
}
}
return Single.just(targetRanges);
}).toObservable();
}).repeat(sortedRanges.size())
.takeUntil(stringRange -> !iterator.hasNext())
.last()
.toSingle();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ private Observable<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest
}

private Observable<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
if(request.getOperationType() != OperationType.QueryPlan) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
}

switch (this.queryCompatibilityMode) {
case SqlQuery:
Expand Down Expand Up @@ -462,7 +464,8 @@ private Observable<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServ
case Replace:
return this.replace(request);
case SqlQuery:
case Query:
case Query:
case QueryPlan:
return this.query(request);
default:
throw new IllegalStateException("Unknown operation type " + request.getOperationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package com.microsoft.azure.cosmosdb.rx.internal.query;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -40,6 +41,7 @@
import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyRangeIdentity;
import com.microsoft.azure.cosmosdb.internal.routing.Range;
import com.microsoft.azure.cosmosdb.internal.routing.RoutingMapProviderHelper;
import com.microsoft.azure.cosmosdb.rx.internal.BackoffRetryUtility;
import com.microsoft.azure.cosmosdb.rx.internal.IDocumentClientRetryPolicy;
import com.microsoft.azure.cosmosdb.rx.internal.InvalidPartitionExceptionRetryPolicy;
Expand Down Expand Up @@ -132,10 +134,14 @@ public Observable<FeedResponse<T>> executeAsync() {
}

public Single<List<PartitionKeyRange>> getTargetPartitionKeyRanges(String resourceId, List<Range<String>> queryRanges) {
// TODO: FIXME this needs to be revisited

Range<String> r = new Range<>("", "FF", true, false);
return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null);
return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges);
}

public Single<List<PartitionKeyRange>> getTargetPartitionKeyRangesById(String resourceId, String partitionKeyRangeIdInternal) {
return client.getPartitionKeyRangeCache().tryGetPartitionKeyRangeByIdAsync(resourceId,
partitionKeyRangeIdInternal,
false,
null).flatMap(partitionKeyRange -> Single.just(Collections.singletonList(partitionKeyRange)));
}

protected Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> executeInternalAsyncFunc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part
}

if (this.resourceTypeEnum.isPartitioned()) {
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId()));
boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null;
if(!hasPartitionKey){
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId()));
}
}
}

Expand Down
Loading