Skip to content

Commit

Permalink
test: fix BulkWriter flaky tests. (googleapis#1510)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannas authored Dec 21, 2023
1 parent 111b4e4 commit ac4cd21
Showing 1 changed file with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -121,6 +122,8 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
private DocumentReference doc1;
private DocumentReference doc2;

private ScheduledExecutorService timeoutExecutor;

public static ApiFuture<BatchWriteResponse> successResponse(int updateTimeSeconds) {
BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder();
response.addWriteResultsBuilder().getUpdateTimeBuilder().setSeconds(updateTimeSeconds).build();
Expand Down Expand Up @@ -155,7 +158,7 @@ public void before() {
lenient().doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
testExecutor = Executors.newSingleThreadScheduledExecutor();

final ScheduledExecutorService timeoutExecutor =
timeoutExecutor =
new ScheduledThreadPoolExecutor(1) {
@Override
@Nonnull
Expand All @@ -170,6 +173,21 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
doc2 = firestoreMock.document("coll/doc2");
}

@After
public void after() throws InterruptedException {
shutdownScheduledExecutorService(timeoutExecutor);
}

void shutdownScheduledExecutorService(ScheduledExecutorService executorService)
throws InterruptedException {
// Wait for the executor to finish after each test.
//
// This ensures the executor service is shut down properly within the given timeout, and thereby
// avoids potential hangs caused by lingering threads. Note that if a given thread is terminated
// because of the timeout, the associated test will fail, which is what we want.
executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
}

@Test
public void hasSetMethod() throws Exception {
ResponseStubber responseStubber =
Expand Down Expand Up @@ -968,7 +986,7 @@ public void doesNotSendBatchesIfDoingSoExceedsRateLimit() throws Exception {
// future at the end of the test to ensure that the timeout was called.
final SettableApiFuture<Void> timeoutCalledFuture = SettableApiFuture.create();

final ScheduledExecutorService timeoutExecutor =
final ScheduledExecutorService customExecutor =
new ScheduledThreadPoolExecutor(1) {
@Override
@Nonnull
Expand All @@ -994,14 +1012,15 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
firestoreMock.bulkWriter(
BulkWriterOptions.builder()
.setInitialOpsPerSecond(5)
.setExecutor(timeoutExecutor)
.setExecutor(customExecutor)
.build());

for (int i = 0; i < 600; ++i) {
bulkWriter.set(firestoreMock.document("coll/doc"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
}
bulkWriter.flush();
timeoutCalledFuture.get();
shutdownScheduledExecutorService(customExecutor);
}

@Test
Expand Down Expand Up @@ -1097,7 +1116,7 @@ public void retriesWritesWhenBatchWriteFailsWithRetryableError() throws Exceptio
public void failsWritesAfterAllRetryAttemptsFail() throws Exception {
final int[] retryAttempts = {0};
final int[] scheduleWithDelayCount = {0};
final ScheduledExecutorService timeoutExecutor =
final ScheduledExecutorService customExecutor =
new ScheduledThreadPoolExecutor(1) {
@Override
@Nonnull
Expand Down Expand Up @@ -1127,7 +1146,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
ArgumentMatchers.<UnaryCallable<BatchWriteRequest, BatchWriteResponse>>any());

bulkWriter =
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(customExecutor).build());
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.flush().get();

Expand All @@ -1139,14 +1158,16 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
assertEquals(BulkWriter.MAX_RETRY_ATTEMPTS + 1, retryAttempts[0]);
// The first attempt should not have a delay.
assertEquals(BulkWriter.MAX_RETRY_ATTEMPTS, scheduleWithDelayCount[0]);
} finally {
shutdownScheduledExecutorService(customExecutor);
}
}

@Test
public void appliesMaxBackoffOnRetriesForResourceExhausted() throws Exception {
final int[] retryAttempts = {0};
final int[] scheduleWithDelayCount = {0};
final ScheduledExecutorService timeoutExecutor =
final ScheduledExecutorService customExecutor =
new ScheduledThreadPoolExecutor(1) {
@Override
@Nonnull
Expand Down Expand Up @@ -1177,7 +1198,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
ArgumentMatchers.<UnaryCallable<BatchWriteRequest, BatchWriteResponse>>any());

bulkWriter =
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(customExecutor).build());
bulkWriter.addWriteErrorListener(error -> error.getFailedAttempts() < 5);

ApiFuture<WriteResult> result = bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
Expand All @@ -1191,6 +1212,8 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
assertEquals(5, retryAttempts[0]);
// The first attempt should not have a delay.
assertEquals(4, scheduleWithDelayCount[0]);
} finally {
shutdownScheduledExecutorService(customExecutor);
}
}

Expand All @@ -1203,7 +1226,7 @@ public void usesHighestBackoffFoundInBatch() throws Exception {
* BulkWriterOperation.DEFAULT_BACKOFF_FACTOR)
};
final int[] retryAttempts = {0};
final ScheduledExecutorService timeoutExecutor =
final ScheduledExecutorService customExecutor =
new ScheduledThreadPoolExecutor(1) {
@Override
@Nonnull
Expand Down Expand Up @@ -1244,14 +1267,15 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
responseStubber.initializeStub(batchWriteCapture, firestoreMock);

bulkWriter =
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(customExecutor).build());
bulkWriter.addWriteErrorListener(error -> error.getFailedAttempts() < 5);

bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();
responseStubber.verifyAllRequestsSent();
assertEquals(2, retryAttempts[0]);
shutdownScheduledExecutorService(customExecutor);
}

@Test
Expand Down

0 comments on commit ac4cd21

Please sign in to comment.