Skip to content

Commit

Permalink
Porting offset limit support from V2
Browse files Browse the repository at this point in the history
Adding test to drain all the documents (Azure#257)
  • Loading branch information
mbhaskar committed Oct 31, 2019
1 parent 173494d commit 082b399
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.cosmos.internal.query;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.JsonSerializable;
import com.azure.data.cosmos.internal.Utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OffsetContinuationToken extends JsonSerializable {
private static final String TOKEN_PROPERTY_NAME = "sourceToken";
private static final String OFFSET_PROPERTY_NAME = "offset";
private static final Logger logger = LoggerFactory.getLogger(CompositeContinuationToken.class);

public OffsetContinuationToken(int offset, String sourceToken) {

if (offset < 0) {
throw new IllegalArgumentException("offset should be non negative");
}

this.setOffset(offset);
this.setSourceToken(sourceToken);
}

public OffsetContinuationToken(String serializedCompositeToken) {
super(serializedCompositeToken);
this.getOffset();
this.getSourceToken();
}

public static boolean tryParse(String serializedOffsetContinuationToken,
Utils.ValueHolder<OffsetContinuationToken> outOffsetContinuationToken) {
if (StringUtils.isEmpty(serializedOffsetContinuationToken)) {
return false;
}

boolean parsed;
try {
outOffsetContinuationToken.v = new OffsetContinuationToken(serializedOffsetContinuationToken);
parsed = true;
} catch (Exception ex) {
logger.debug("Received exception {} when trying to parse: {}",
ex.getMessage(),
serializedOffsetContinuationToken);
parsed = false;
outOffsetContinuationToken.v = null;
}

return parsed;
}

public String getSourceToken() {
return super.getString(TOKEN_PROPERTY_NAME);
}

private void setSourceToken(String sourceToken) {
BridgeInternal.setProperty(this, TOKEN_PROPERTY_NAME, sourceToken);
}

public int getOffset() {
return super.getInt(OFFSET_PROPERTY_NAME);
}

private void setOffset(int offset) {
BridgeInternal.setProperty(this, OFFSET_PROPERTY_NAME, offset);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,41 @@ public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T
createAggregateComponentFunction = createBaseComponentFunction;
}

Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSkipComponentFunction;
if (queryInfo.hasOffset()) {
createSkipComponentFunction = (continuationToken) -> {
return SkipDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction,
queryInfo.getOffset(),
continuationToken);
};
} else {
createSkipComponentFunction = createAggregateComponentFunction;
}

Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createTopComponentFunction;
if (queryInfo.hasTop()) {
createTopComponentFunction = (continuationToken) -> {
return TopDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction,
queryInfo.getTop(), continuationToken);
return TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction,
queryInfo.getTop(), queryInfo.getTop(), continuationToken);
};
} else {
createTopComponentFunction = createSkipComponentFunction;
}

Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createTakeComponentFunction;
if (queryInfo.hasLimit()) {
createTakeComponentFunction = (continuationToken) -> {
int totalLimit = queryInfo.getLimit();
if (queryInfo.hasOffset()) {
// This is being done to match the limit from rewritten query
totalLimit = queryInfo.getOffset() + queryInfo.getLimit();
}
return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction,
queryInfo.getLimit(), totalLimit,
continuationToken);
};
} else {
createTopComponentFunction = createAggregateComponentFunction;
createTakeComponentFunction = createTopComponentFunction;
}

int actualPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(),
Expand All @@ -100,7 +127,7 @@ public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T
}

int pageSize = Math.min(actualPageSize, Utils.getValueOrDefault(queryInfo.getTop(), (actualPageSize)));
return createTopComponentFunction.apply(feedOptions.requestContinuation())
return createTakeComponentFunction.apply(feedOptions.requestContinuation())
.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public final class QueryInfo extends JsonSerializable {
private Collection<AggregateOperator> aggregates;
private Collection<String> orderByExpressions;
private String rewrittenQuery;
private Integer offset;
private Integer limit;

public QueryInfo() { }

Expand Down Expand Up @@ -73,4 +75,21 @@ public Collection<String> getOrderByExpressions() {
public boolean hasSelectValue(){
return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE);
}

public boolean hasOffset() {
return this.getOffset() != null;
}

public boolean hasLimit() {
return this.getLimit() != null;
}

public Integer getLimit() {
return this.limit != null ? this.limit : (this.limit = super.getInt("limit"));
}

public Integer getOffset() {
return this.offset != null ? this.offset : (this.offset = super.getInt("offset"));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class QueryPlanRetriever {
QueryFeature.CompositeAggregate.name() + ", " +
QueryFeature.MultipleOrderBy.name() + ", " +
QueryFeature.OrderBy.name() + ", " +
QueryFeature.OffsetAndLimit.name() + ", " +
QueryFeature.Top.name();

static Mono<PartitionedQueryExecutionInfo> getQueryPlanThroughGatewayAsync(IDocumentQueryClient queryClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.cosmos.internal.query;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.Resource;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.Utils;
import reactor.core.publisher.Flux;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class SkipDocumentQueryExecutionContext<T extends Resource> implements IDocumentQueryExecutionComponent<T> {

private final IDocumentQueryExecutionComponent<T> component;
private int skipCount;

SkipDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int skipCount) {
if (component == null) {
throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null");
}
this.component = component;
this.skipCount = skipCount;
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
int skipCount,
String continuationToken) {
OffsetContinuationToken offsetContinuationToken;
Utils.ValueHolder<OffsetContinuationToken> outOffsetContinuationToken = new Utils.ValueHolder<>();
if (continuationToken != null) {
if (!OffsetContinuationToken.tryParse(continuationToken, outOffsetContinuationToken)) {
String message = String.format("Invalid JSON in continuation token %s for Skip~Context",
continuationToken);
CosmosClientException dce =
BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.BADREQUEST,
message);
return Flux.error(dce);
}

offsetContinuationToken = outOffsetContinuationToken.v;
} else {
offsetContinuationToken = new OffsetContinuationToken(skipCount, null);
}

return createSourceComponentFunction.apply(offsetContinuationToken.getSourceToken())
.map(component -> new SkipDocumentQueryExecutionContext<>(component,
offsetContinuationToken.getOffset()));
}

@Override
public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {

return this.component.drainAsync(maxPageSize).map(tFeedResponse -> {

List<T> documentsAfterSkip =
tFeedResponse.results().stream().skip(this.skipCount).collect(Collectors.toList());

int numberOfDocumentsSkipped = tFeedResponse.results().size() - documentsAfterSkip.size();
this.skipCount -= numberOfDocumentsSkipped;

Map<String, String> headers = new HashMap<>(tFeedResponse.responseHeaders());
if (this.skipCount >= 0) {
// Add Offset Continuation Token
String sourceContinuationToken = tFeedResponse.continuationToken();
OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(this.skipCount,
sourceContinuationToken);
headers.put(HttpConstants.HttpHeaders.CONTINUATION, offsetContinuationToken.toJson());
}

return BridgeInternal.createFeedResponseWithQueryMetrics(documentsAfterSkip, headers,
BridgeInternal.queryMetricsFromFeedResponse(tFeedResponse));
});
}

IDocumentQueryExecutionComponent<T> getComponent() {
return this.component;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ public class TopDocumentQueryExecutionContext<T extends Resource> implements IDo

private final IDocumentQueryExecutionComponent<T> component;
private final int top;
// limit from rewritten query
private final int limit;

public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int top) {
public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int top, int limit) {
this.component = component;
this.top = top;
this.limit = limit;
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
int topCount, String topContinuationToken) {
int topCount, int limit, String topContinuationToken) {
TakeContinuationToken takeContinuationToken;

if (topContinuationToken == null) {
Expand Down Expand Up @@ -56,21 +59,26 @@ public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> cre

return createSourceComponentFunction
.apply(takeContinuationToken.getSourceToken())
.map(component -> new TopDocumentQueryExecutionContext<>(component, takeContinuationToken.getTakeCount()));
.map(component -> new TopDocumentQueryExecutionContext<>(component, takeContinuationToken.getTakeCount(), limit));
}

@Override
public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
ParallelDocumentQueryExecutionContextBase<T> context;

if (this.component instanceof AggregateDocumentQueryExecutionContext<?>) {
context = (ParallelDocumentQueryExecutionContextBase<T>) ((AggregateDocumentQueryExecutionContext<T>) this.component)
.getComponent();
context =
(ParallelDocumentQueryExecutionContextBase<T>) ((AggregateDocumentQueryExecutionContext<T>) this.component)
.getComponent();
} else if (this.component instanceof SkipDocumentQueryExecutionContext<?>) {
context =
(ParallelDocumentQueryExecutionContextBase<T>) ((SkipDocumentQueryExecutionContext<T>) this.component)
.getComponent();
} else {
context = (ParallelDocumentQueryExecutionContextBase<T>) this.component;
}

context.setTop(this.top);
context.setTop(this.limit);

return this.component.drainAsync(maxPageSize).takeUntil(new Predicate<FeedResponse<T>>() {

Expand Down
Loading

0 comments on commit 082b399

Please sign in to comment.