diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java index af315f924..4041e0ed3 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java @@ -41,6 +41,8 @@ public final class QueryInfo extends JsonSerializable { private Collection aggregates; private Collection orderByExpressions; private String rewrittenQuery; + private Integer offset; + private Integer limit; public QueryInfo() { } @@ -90,8 +92,24 @@ public Collection getOrderByExpressions() { ? this.orderByExpressions : (this.orderByExpressions = super.getCollection("orderByExpressions", String.class)); } - - public boolean hasSelectValue(){ + + public boolean hasSelectValue() { return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE); } + + public boolean hasOffset() { + return this.getOffset() != null; + } + + public boolean hasLimit() { + return this.getLimit() != null; + } + + public Integer getLimit() { + return this.limit != null ? this.limit : (this.limit = super.getInt("limit")); + } + + public Integer getOffset() { + return this.offset != null ? this.offset : (this.offset = super.getInt("offset")); + } } \ No newline at end of file diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/OffsetContinuationToken.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/OffsetContinuationToken.java new file mode 100644 index 000000000..b656f001e --- /dev/null +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/OffsetContinuationToken.java @@ -0,0 +1,67 @@ +package com.microsoft.azure.cosmosdb.rx.internal.query; + +import com.microsoft.azure.cosmosdb.JsonSerializable; +import com.microsoft.azure.cosmosdb.rx.internal.Utils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class OffsetContinuationToken extends JsonSerializable { + private static final String TOKEN_PROPERTY_NAME = "sourceToken"; + private static final String OFFSET_PROPERTY_NAME = "offset"; + private static final Logger logger = LoggerFactory.getLogger(CompositeContinuationToken.class); + + public OffsetContinuationToken(int offset, String sourceToken) { + + if (offset < 0) { + throw new IllegalArgumentException("offset should be non negative"); + } + + this.setOffset(offset); + this.setSourceToken(sourceToken); + } + + public OffsetContinuationToken(String serializedCompositeToken) { + super(serializedCompositeToken); + this.getOffset(); + this.getSourceToken(); + } + + private void setSourceToken(String sourceToken) { + super.set(TOKEN_PROPERTY_NAME, sourceToken); + } + + private void setOffset(int offset) { + super.set(OFFSET_PROPERTY_NAME, offset); + } + + public String getSourceToken() { + return super.getString(TOKEN_PROPERTY_NAME); + } + + public int getOffset() { + return super.getInt(OFFSET_PROPERTY_NAME); + } + + public static boolean tryParse(String serializedOffsetContinuationToken, + Utils.ValueHolder outOffsetContinuationToken) { + if (StringUtils.isEmpty(serializedOffsetContinuationToken)) { + return false; + } + + boolean parsed; + try { + outOffsetContinuationToken.v = new OffsetContinuationToken(serializedOffsetContinuationToken); + parsed = true; + } catch (Exception ex) { + logger.debug("Received exception {} when trying to parse: {}", + ex.getMessage(), + serializedOffsetContinuationToken); + parsed = false; + outOffsetContinuationToken.v = null; + } + + return parsed; + } + +} diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java index ef0a0d3a8..591b3579e 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java @@ -106,14 +106,36 @@ public static Observable>> createSkipComponentFunction; + if (queryInfo.hasOffset()) { + createSkipComponentFunction = (continuationToken) -> { + return SkipDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction, + queryInfo.getOffset(), + continuationToken); + }; + } else { + createSkipComponentFunction = createAggregateComponentFunction; + } + Function>> createTopComponentFunction; if (queryInfo.hasTop()) { createTopComponentFunction = (continuationToken) -> { - return TopDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction, + return TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction, queryInfo.getTop(), continuationToken); }; } else { - createTopComponentFunction = createAggregateComponentFunction; + createTopComponentFunction = createSkipComponentFunction; + } + + Function>> createTakeComponentFunction; + if (queryInfo.hasLimit()) { + createTakeComponentFunction = (continuationToken) -> { + return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction, + queryInfo.getLimit(), + continuationToken); + }; + } else { + createTakeComponentFunction = createTopComponentFunction; } int actualPageSize = Utils.getValueOrDefault(feedOptions.getMaxItemCount(), @@ -124,7 +146,7 @@ public static Observable new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId)); } diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/SkipDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/SkipDocumentQueryExecutionContext.java new file mode 100644 index 000000000..947b6fb84 --- /dev/null +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/SkipDocumentQueryExecutionContext.java @@ -0,0 +1,84 @@ +package com.microsoft.azure.cosmosdb.rx.internal.query; + +import com.microsoft.azure.cosmosdb.BridgeInternal; +import com.microsoft.azure.cosmosdb.DocumentClientException; +import com.microsoft.azure.cosmosdb.FeedResponse; +import com.microsoft.azure.cosmosdb.Resource; +import com.microsoft.azure.cosmosdb.internal.HttpConstants; +import com.microsoft.azure.cosmosdb.rx.internal.Utils; +import rx.Observable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public final class SkipDocumentQueryExecutionContext implements IDocumentQueryExecutionComponent { + + private final IDocumentQueryExecutionComponent component; + private int skipCount; + + SkipDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int skipCount) { + if (component == null){ + throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null"); + } + this.component = component; + this.skipCount = skipCount; + } + + public static Observable> createAsync( + Function>> createSourceComponentFunction, + int skipCount, + String continuationToken) { + OffsetContinuationToken offsetContinuationToken; + Utils.ValueHolder outOffsetContinuationToken = new Utils.ValueHolder<>(); + + if (continuationToken != null) { + if (!OffsetContinuationToken.tryParse(continuationToken, outOffsetContinuationToken)) { + String message = String.format("Invalid JSON in continuation token %s for Skip~Context", + continuationToken); + DocumentClientException dce = new DocumentClientException(HttpConstants.StatusCodes.BADREQUEST, + message); + return Observable.error(dce); + } + + offsetContinuationToken = outOffsetContinuationToken.v; + } else { + offsetContinuationToken = new OffsetContinuationToken(skipCount, null); + } + + return createSourceComponentFunction.apply(offsetContinuationToken.getSourceToken()) + .map(component -> new SkipDocumentQueryExecutionContext<>(component, offsetContinuationToken.getOffset())); + } + + @Override + public Observable> drainAsync(int maxPageSize) { + + return this.component.drainAsync(maxPageSize).map(tFeedResponse -> { + + List documentsAfterSkip = + tFeedResponse.getResults().stream().skip(this.skipCount).collect(Collectors.toList()); + + int numberOfDocumentsSkipped = tFeedResponse.getResults().size() - documentsAfterSkip.size(); + this.skipCount -= numberOfDocumentsSkipped; + + Map headers = new HashMap<>(tFeedResponse.getResponseHeaders()); + if (this.skipCount >= 0) { + // Add Offset Continuation Token + String sourceContinuationToken = tFeedResponse.getResponseContinuation(); + OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(this.skipCount, + sourceContinuationToken); + headers.put(HttpConstants.HttpHeaders.CONTINUATION, offsetContinuationToken.toJson()); + } + + return BridgeInternal.createFeedResponseWithQueryMetrics(documentsAfterSkip, headers, + tFeedResponse.getQueryMetrics()); + }); + } + + IDocumentQueryExecutionComponent getComponent() { + return this.component; + } + +} diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java index afa80ca43..79904836c 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java @@ -87,6 +87,9 @@ public Observable> drainAsync(int maxPageSize) { if (this.component instanceof AggregateDocumentQueryExecutionContext) { context = (ParallelDocumentQueryExecutionContextBase) ((AggregateDocumentQueryExecutionContext) this.component) .getComponent(); + } else if (this.component instanceof SkipDocumentQueryExecutionContext) { + context = (ParallelDocumentQueryExecutionContextBase) ((SkipDocumentQueryExecutionContext) this.component) + .getComponent(); } else { context = (ParallelDocumentQueryExecutionContextBase) this.component; } diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OffsetLimitQueryTests.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OffsetLimitQueryTests.java new file mode 100644 index 000000000..bafa8687a --- /dev/null +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OffsetLimitQueryTests.java @@ -0,0 +1,210 @@ +/* + * The MIT License (MIT) + * Copyright (c) 2018 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.rx; + +import com.microsoft.azure.cosmosdb.Database; +import com.microsoft.azure.cosmosdb.Document; +import com.microsoft.azure.cosmosdb.DocumentCollection; +import com.microsoft.azure.cosmosdb.FeedOptions; +import com.microsoft.azure.cosmosdb.FeedResponse; +import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol; +import com.microsoft.azure.cosmosdb.rx.internal.Utils.ValueHolder; +import com.microsoft.azure.cosmosdb.rx.internal.query.OffsetContinuationToken; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import rx.Observable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class OffsetLimitQueryTests extends TestSuiteBase { + private Database createdDatabase; + private DocumentCollection createdCollection; + private ArrayList docs = new ArrayList<>(); + + private String partitionKey = "mypk"; + private int firstPk = 0; + private int secondPk = 1; + private String field = "field"; + + private AsyncDocumentClient client; + + @Factory(dataProvider = "clientBuildersWithDirect") + public OffsetLimitQueryTests(AsyncDocumentClient.Builder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") + public void queryDocuments(boolean qmEnabled) { + int skipCount = 4; + int takeCount = 10; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + FeedOptions options = new FeedOptions(); + options.setMaxItemCount(5); + options.setEnableCrossPartitionQuery(true); + options.setPopulateQueryMetrics(qmEnabled); + options.setMaxDegreeOfParallelism(2); + Observable> queryObservable = client.queryDocuments(createdCollection.getSelfLink(), + query, options); + + FeedResponseListValidator validator = + new FeedResponseListValidator.Builder().totalSize(takeCount).allPagesSatisfy(new FeedResponseValidator.Builder().requestChargeGreaterThanOrEqualTo(1.0).build()) + .hasValidQueryMetrics(qmEnabled) + .build(); + + try { + validateQuerySuccess(queryObservable, validator, TIMEOUT); + } catch (Throwable error) { + if (this.clientBuilder().configs.getProtocol() == Protocol.Tcp) { + String message = String.format("Direct TCP test failure: desiredConsistencyLevel=%s", + this.clientBuilder().desiredConsistencyLevel); + logger.info(message, error); + throw new SkipException(message, error); + } + throw error; + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void offsetContinuationTokenRoundTrips() { + // Positive + OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(42, "asdf"); + String serialized = offsetContinuationToken.toString(); + ValueHolder outOffsetContinuationToken = new ValueHolder<>(); + + assertThat(OffsetContinuationToken.tryParse(serialized, outOffsetContinuationToken)).isTrue(); + OffsetContinuationToken deserialized = outOffsetContinuationToken.v; + + assertThat(deserialized.getOffset()).isEqualTo(42); + assertThat(deserialized.getSourceToken()).isEqualTo("asdf"); + + // Negative + ValueHolder outTakeContinuationToken = new ValueHolder(); + assertThat( + OffsetContinuationToken.tryParse("{\"property\": \"Not a valid token\"}", outTakeContinuationToken)) + .isFalse(); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT * 10) + public void queryDocumentsWithTopContinuationTokens() { + int skipCount = 3; + int takeCount = 10; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + this.queryWithContinuationTokensAndPageSizes(query, new int[]{1, 5, 15}, takeCount); + } + + private void queryWithContinuationTokensAndPageSizes(String query, int[] pageSizes, int takeCount) { + for (int pageSize : pageSizes) { + List receivedDocuments = this.queryWithContinuationTokens(query, pageSize); + Set actualIds = new HashSet(); + for (Document document : receivedDocuments) { + actualIds.add(document.getResourceId()); + } + + assertThat(actualIds.size()).describedAs("total number of results").isEqualTo(takeCount); + } + } + + private List queryWithContinuationTokens(String query, int pageSize) { + String requestContinuation = null; + List continuationTokens = new ArrayList(); + List receivedDocuments = new ArrayList(); + + do { + FeedOptions options = new FeedOptions(); + options.setMaxItemCount(pageSize); + options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); + options.setRequestContinuation(requestContinuation); + Observable> queryObservable = + client.queryDocuments(createdCollection.getSelfLink(), query, options); + + Observable> firstPageObservable = queryObservable.first(); + VerboseTestSubscriber> testSubscriber = new VerboseTestSubscriber<>(); + firstPageObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); + + FeedResponse firstPage = testSubscriber.getOnNextEvents().get(0); + requestContinuation = firstPage.getResponseContinuation(); + receivedDocuments.addAll(firstPage.getResults()); + + continuationTokens.add(requestContinuation); + } while (requestContinuation != null); + + return receivedDocuments; + } + + public void bulkInsert(AsyncDocumentClient client) { + generateTestData(); + + for (Document doc : docs) { + createDocument(client, createdDatabase.getId(), createdCollection.getId(), doc); + } + } + + public void generateTestData() { + + for (int i = 0; i < 10; i++) { + Document d = new Document(); + d.setId(Integer.toString(i)); + d.set(field, i); + d.set(partitionKey, firstPk); + docs.add(d); + } + + for (int i = 10; i < 20; i++) { + Document d = new Document(); + d.setId(Integer.toString(i)); + d.set(field, i); + d.set(partitionKey, secondPk); + docs.add(d); + } + } + + @AfterClass(groups = {"simple"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeClose(client); + } + + @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() throws Exception { + client = this.clientBuilder().build(); + createdDatabase = SHARED_DATABASE; + createdCollection = SHARED_MULTI_PARTITION_COLLECTION; + truncateCollection(SHARED_MULTI_PARTITION_COLLECTION); + + bulkInsert(client); + + waitIfNeededForReplicasToCatchUp(clientBuilder()); + } +}