Skip to content

Commit

Permalink
React to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tvaron3 committed Jan 21, 2025
1 parent f296c7d commit cffc19a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
Expand Down Expand Up @@ -70,6 +71,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -119,6 +121,14 @@ public static Object[][] changeFeedQueryEndLSNDataProvider() {
};
}

@DataProvider(name = "changeFeedQueryPrefetchingDataProvider")
public static Object[][] changeFeedQueryPrefetchingDataProvider() {
return new Object[][]{
{ChangeFeedMode.FULL_FIDELITY},
{ ChangeFeedMode.INCREMENTAL},
};
}

@DataProvider(name = "changeFeedQueryEndLSNHangDataProvider")
public static Object[][] changeFeedQueryEndLSNHangDataProvider() {
return new Object[][]{
Expand Down Expand Up @@ -323,27 +333,59 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro
}
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void asyncChangeFeedPrefetching() throws Exception {
@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT)
public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception {
this.createContainer(
(cp) -> cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy())
(cp) -> {
if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) {
return cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy());
}
return cp.setChangeFeedPolicy(ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10)));
}
);
insertDocuments(1, 20);

CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10);
CosmosChangeFeedRequestOptions options;
if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.setMaxItemCount(10).allVersionsAndDeletes();
} else {
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10);
}
AtomicInteger count = new AtomicInteger(0);
// Will keep grabbing pages
createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) ->
count.incrementAndGet()).byPage().subscribe();

insertDocuments(5, 20);
AtomicReference<String> continuation = new AtomicReference<>("");
createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> {
count.incrementAndGet();
continuation.set(r.getContinuationToken());
}
).byPage().subscribe();

CosmosChangeFeedRequestOptions optionsFF = null;
if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
insertDocuments(5, 20);
count.set(0);
optionsFF = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(continuation.get())
.setMaxItemCount(10).allVersionsAndDeletes();
createdContainer.asyncContainer.queryChangeFeed(optionsFF, ObjectNode.class).handle((r) -> {
count.incrementAndGet();
continuation.set(r.getContinuationToken());
}
).byPage().subscribe();
}
Thread.sleep(3000);
assertThat(count.get()).isNotEqualTo(2);
assertThat(count.get()).isGreaterThan(2);

if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
// full fidelity is only from now so need to insert more documents
insertDocuments(5, 20);
}
count.set(0);
// should only get two pages
createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) ->
count.incrementAndGet()).byPage().take(2, true).subscribe();
createdContainer.asyncContainer.queryChangeFeed(changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)? optionsFF
: options, ObjectNode.class).handle((r) -> count.incrementAndGet())
.byPage().take(2, true).subscribe();
Thread.sleep(3000);
assertThat(count.get()).isEqualTo(2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase {
private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper();

private CosmosAsyncDatabase createdDatabase;
// private final String databaseId = "testdb1";
// private final String hostName = "TestHost1";
private final String hostName = RandomStringUtils.randomAlphabetic(6);
private final int FEED_COUNT = 10;
private final int CHANGE_FEED_PROCESSOR_TIMEOUT = 5000;
Expand Down

0 comments on commit cffc19a

Please sign in to comment.