Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset limit query support #234

Merged
merged 3 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class QueryInfo extends JsonSerializable {
private Collection<AggregateOperator> aggregates;
private Collection<String> orderByExpressions;
private String rewrittenQuery;
private Integer offset;
private Integer limit;

public QueryInfo() { }

Expand Down Expand Up @@ -90,8 +92,24 @@ public Collection<String> getOrderByExpressions() {
? this.orderByExpressions
: (this.orderByExpressions = super.getCollection("orderByExpressions", String.class));
}
public boolean hasSelectValue(){

public boolean hasSelectValue() {
return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE);
}

public boolean hasOffset() {
return this.getOffset() != null;
}

public boolean hasLimit() {
return this.getLimit() != null;
}

public Integer getLimit() {
return this.limit != null ? this.limit : (this.limit = super.getInt("limit"));
}

public Integer getOffset() {
return this.offset != null ? this.offset : (this.offset = super.getInt("offset"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.microsoft.azure.cosmosdb.rx.internal.query;

import com.microsoft.azure.cosmosdb.JsonSerializable;
import com.microsoft.azure.cosmosdb.rx.internal.Utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OffsetContinuationToken extends JsonSerializable {
private static final String TOKEN_PROPERTY_NAME = "sourceToken";
private static final String OFFSET_PROPERTY_NAME = "offset";
private static final Logger logger = LoggerFactory.getLogger(CompositeContinuationToken.class);

public OffsetContinuationToken(int offset, String sourceToken) {

if (offset < 0) {
throw new IllegalArgumentException("offset should be non negative");
}

this.setOffset(offset);
this.setSourceToken(sourceToken);
}

public OffsetContinuationToken(String serializedCompositeToken) {
super(serializedCompositeToken);
this.getOffset();
this.getSourceToken();
}

private void setSourceToken(String sourceToken) {
super.set(TOKEN_PROPERTY_NAME, sourceToken);
}

private void setOffset(int offset) {
super.set(OFFSET_PROPERTY_NAME, offset);
}

public String getSourceToken() {
return super.getString(TOKEN_PROPERTY_NAME);
}

public int getOffset() {
return super.getInt(OFFSET_PROPERTY_NAME);
}

public static boolean tryParse(String serializedOffsetContinuationToken,
Utils.ValueHolder<OffsetContinuationToken> outOffsetContinuationToken) {
if (StringUtils.isEmpty(serializedOffsetContinuationToken)) {
return false;
}

boolean parsed;
try {
outOffsetContinuationToken.v = new OffsetContinuationToken(serializedOffsetContinuationToken);
parsed = true;
} catch (Exception ex) {
logger.debug("Received exception {} when trying to parse: {}",
ex.getMessage(),
serializedOffsetContinuationToken);
parsed = false;
outOffsetContinuationToken.v = null;
}

return parsed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,36 @@ public static <T extends Resource> Observable<PipelinedDocumentQueryExecutionCon
createAggregateComponentFunction = createBaseComponentFunction;
}

Function<String, Observable<IDocumentQueryExecutionComponent<T>>> createSkipComponentFunction;
if (queryInfo.hasOffset()) {
createSkipComponentFunction = (continuationToken) -> {
return SkipDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction,
queryInfo.getOffset(),
continuationToken);
};
} else {
createSkipComponentFunction = createAggregateComponentFunction;
}

Function<String, Observable<IDocumentQueryExecutionComponent<T>>> createTopComponentFunction;
if (queryInfo.hasTop()) {
createTopComponentFunction = (continuationToken) -> {
return TopDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction,
return TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction,
queryInfo.getTop(), continuationToken);
};
} else {
createTopComponentFunction = createAggregateComponentFunction;
createTopComponentFunction = createSkipComponentFunction;
}

Function<String, Observable<IDocumentQueryExecutionComponent<T>>> createTakeComponentFunction;
if (queryInfo.hasLimit()) {
createTakeComponentFunction = (continuationToken) -> {
return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction,
queryInfo.getLimit(),
continuationToken);
};
} else {
createTakeComponentFunction = createTopComponentFunction;
}

int actualPageSize = Utils.getValueOrDefault(feedOptions.getMaxItemCount(),
Expand All @@ -124,7 +146,7 @@ public static <T extends Resource> Observable<PipelinedDocumentQueryExecutionCon
}

int pageSize = Math.min(actualPageSize, Utils.getValueOrDefault(queryInfo.getTop(), (actualPageSize)));
return createTopComponentFunction.apply(feedOptions.getRequestContinuation())
return createTakeComponentFunction.apply(feedOptions.getRequestContinuation())
.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.microsoft.azure.cosmosdb.rx.internal.query;

import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.FeedResponse;
import com.microsoft.azure.cosmosdb.Resource;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.rx.internal.Utils;
import rx.Observable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class SkipDocumentQueryExecutionContext<T extends Resource> implements IDocumentQueryExecutionComponent<T> {

private final IDocumentQueryExecutionComponent<T> component;
private int skipCount;

SkipDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int skipCount) {
if (component == null){
throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null");
}
this.component = component;
this.skipCount = skipCount;
}

public static <T extends Resource> Observable<IDocumentQueryExecutionComponent<T>> createAsync(
Function<String, Observable<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
int skipCount,
String continuationToken) {
OffsetContinuationToken offsetContinuationToken;
Utils.ValueHolder<OffsetContinuationToken> outOffsetContinuationToken = new Utils.ValueHolder<>();

if (continuationToken != null) {
if (!OffsetContinuationToken.tryParse(continuationToken, outOffsetContinuationToken)) {
String message = String.format("Invalid JSON in continuation token %s for Skip~Context",
continuationToken);
DocumentClientException dce = new DocumentClientException(HttpConstants.StatusCodes.BADREQUEST,
message);
return Observable.error(dce);
}

offsetContinuationToken = outOffsetContinuationToken.v;
} else {
offsetContinuationToken = new OffsetContinuationToken(skipCount, null);
}

return createSourceComponentFunction.apply(offsetContinuationToken.getSourceToken())
.map(component -> new SkipDocumentQueryExecutionContext<>(component, offsetContinuationToken.getOffset()));
}

@Override
public Observable<FeedResponse<T>> drainAsync(int maxPageSize) {

return this.component.drainAsync(maxPageSize).map(tFeedResponse -> {

List<T> documentsAfterSkip =
tFeedResponse.getResults().stream().skip(this.skipCount).collect(Collectors.toList());

int numberOfDocumentsSkipped = tFeedResponse.getResults().size() - documentsAfterSkip.size();
this.skipCount -= numberOfDocumentsSkipped;

Map<String, String> headers = new HashMap<>(tFeedResponse.getResponseHeaders());
if (this.skipCount >= 0) {
// Add Offset Continuation Token
String sourceContinuationToken = tFeedResponse.getResponseContinuation();
OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(this.skipCount,
sourceContinuationToken);
headers.put(HttpConstants.HttpHeaders.CONTINUATION, offsetContinuationToken.toJson());
}

return BridgeInternal.createFeedResponseWithQueryMetrics(documentsAfterSkip, headers,
tFeedResponse.getQueryMetrics());
});
}

IDocumentQueryExecutionComponent<T> getComponent() {
return this.component;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public Observable<FeedResponse<T>> drainAsync(int maxPageSize) {
if (this.component instanceof AggregateDocumentQueryExecutionContext<?>) {
context = (ParallelDocumentQueryExecutionContextBase<T>) ((AggregateDocumentQueryExecutionContext<T>) this.component)
.getComponent();
} else if (this.component instanceof SkipDocumentQueryExecutionContext<?>) {
context = (ParallelDocumentQueryExecutionContextBase<T>) ((SkipDocumentQueryExecutionContext<T>) this.component)
.getComponent();
} else {
context = (ParallelDocumentQueryExecutionContextBase<T>) this.component;
}
Expand Down
Loading