Skip to content

Commit

Permalink
chore: Fix testCancelGetAttempt flaky test (#3592)
Browse files Browse the repository at this point in the history
Fixes #2851
  • Loading branch information
lqiu96 authored Feb 6, 2025
1 parent 81e21f2 commit c7d11d4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@
import com.google.common.base.Stopwatch;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -66,6 +70,23 @@
@ExtendWith(MockitoExtension.class)
public abstract class AbstractRetryingExecutorTest {

private static final int DEFAULT_AWAIT_TERMINATION_SEC = 10;

// This is the default executor service for RetryingExecutor tests unless
// a local executor is specifically used
ScheduledExecutorService scheduledExecutorService;

@BeforeEach
void setup() {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}

@AfterEach
void cleanup() throws InterruptedException {
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(DEFAULT_AWAIT_TERMINATION_SEC, TimeUnit.SECONDS);
}

public static Stream<Arguments> data() {
return Stream.of(Arguments.of(false), Arguments.of(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,17 @@
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

class ScheduledRetryingExecutorTest extends AbstractRetryingExecutorTest {
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

// Number of test runs, essential for multithreaded tests.
private static final int EXECUTIONS_COUNT = 5;

@Override
protected RetryingExecutorWithContext<String> getExecutor(RetryAlgorithm<String> retryAlgorithm) {
return getRetryingExecutor(retryAlgorithm, scheduler);
return getRetryingExecutor(retryAlgorithm, scheduledExecutorService);
}

@Override
Expand All @@ -78,30 +73,24 @@ private RetryingExecutorWithContext<String> getRetryingExecutor(
return new ScheduledRetryingExecutor<>(retryAlgorithm, scheduler);
}

@AfterEach
void after() {
scheduler.shutdownNow();
}

@Test
void testSuccessWithFailuresPeekAttempt() throws Exception {
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(100)
.build();
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
final int maxRetries = 100;

ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer);

RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(maxRetries)
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
Expand Down Expand Up @@ -131,28 +120,28 @@ void testSuccessWithFailuresPeekAttempt() throws Exception {
assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(failedAttempts > 0);
localExecutor.shutdownNow();
}
}

@Test
void testSuccessWithFailuresGetAttempt() throws Exception {
int maxRetries = 100;
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(maxRetries)
.build();
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
final int maxRetries = 100;

ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer);
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(maxRetries)
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
Expand Down Expand Up @@ -185,75 +174,64 @@ void testSuccessWithFailuresGetAttempt() throws Exception {
assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(checks > 1 && checks <= maxRetries, "checks is equal to " + checks);
localExecutor.shutdownNow();
}
}

@ParameterizedTest
@MethodSource("data")
void testCancelGetAttempt(boolean withCustomRetrySettings) throws Exception {
setUp(withCustomRetrySettings);
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
final int maxRetries = 20;

FailingCallable callable = new FailingCallable(maxRetries - 1, "request", "SUCCESS", tracer);
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(5000L))
.setMaxAttempts(maxRetries)
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
assertSame(future.getAttemptResult(), future.getAttemptResult());
assertFalse(future.getAttemptResult().isDone());
assertFalse(future.getAttemptResult().isCancelled());

future.setAttemptFuture(executor.submit(future));

CustomException exception;
CancellationException cancellationException = null;
int checks = 0;
int failedCancelations = 0;
do {
exception = null;
checks++;
Future<String> attemptResult = future.getAttemptResult();
try {
attemptResult.get();
assertNotNull(future.peekAttemptResult());
} catch (CancellationException e) {
cancellationException = e;
} catch (ExecutionException e) {
exception = (CustomException) e.getCause();
}
assertTrue(attemptResult.isDone());
if (!future.cancel(true)) {
failedCancelations++;
}
} while (exception != null && checks < maxRetries);

assertTrue(future.isDone());
assertNotNull(cancellationException);
// future.cancel(true) may return false sometimes, which is ok. Also, the every cancellation
// of
// an already cancelled future should return false (this is what -1 means here)
assertEquals(2, checks - (failedCancelations - 1));
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertFutureCancel(future);
localExecutor.shutdownNow();
}
@Test
void testCancelGetAttempt() throws Exception {
int maxRetries = 100;
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setInitialRpcTimeoutDuration(java.time.Duration.ofMillis(50L))
.setMaxRpcTimeoutDuration(java.time.Duration.ofMillis(50L))
.setTotalTimeoutDuration(java.time.Duration.ofMillis(5000L))
.setMaxAttempts(maxRetries)
.build();
FailingCallable callable = new FailingCallable(maxRetries - 1, "request", "SUCCESS", tracer);

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
assertSame(future.getAttemptResult(), future.getAttemptResult());
assertFalse(future.getAttemptResult().isDone());
assertFalse(future.getAttemptResult().isCancelled());

future.setAttemptFuture(executor.submit(future));

CustomException exception;
CancellationException cancellationException = null;
int checks = 0;
do {
exception = null;
checks++;
Future<String> attemptResult = future.getAttemptResult();
try {
attemptResult.get();
assertNotNull(future.peekAttemptResult());
} catch (CancellationException e) {
cancellationException = e;
} catch (ExecutionException e) {
exception = (CustomException) e.getCause();
}
future.cancel(true);
assertTrue(attemptResult.isDone());
} while (exception != null && checks < maxRetries);

assertTrue(future.isDone());
assertNotNull(cancellationException);
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertFutureCancel(future);
}

@Test
void testCancelOuterFutureAfterStart() throws Exception {
ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
Expand All @@ -278,9 +256,11 @@ void testCancelOuterFutureAfterStart() throws Exception {
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
FailingCallable callable = new FailingCallable(4, "request", "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

Expand All @@ -301,27 +281,29 @@ void testCancelOuterFutureAfterStart() throws Exception {
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertTrue(future.getAttemptSettings().getAttemptCount() < 4);
}
localExecutor.shutdown();
localExecutor.awaitTermination(10, TimeUnit.SECONDS);
}

@Test
void testCancelProxiedFutureAfterStart() throws Exception {
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setInitialRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setMaxRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setTotalTimeoutDuration(java.time.Duration.ofMillis(10_0000L))
.build();
// this is a heavy test, which takes a lot of time, so only few executions.
for (int executionsCount = 0; executionsCount < 2; executionsCount++) {
// Use a test local executor for this test case due to the reasons listed below
ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
FailingCallable callable = new FailingCallable(5, "request", "SUCCESS", tracer);
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setInitialRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setMaxRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setTotalTimeoutDuration(java.time.Duration.ofMillis(10_0000L))
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

Expand All @@ -330,8 +312,8 @@ void testCancelProxiedFutureAfterStart() throws Exception {
// Note that shutdownNow() will not cancel internal FutureTasks automatically, which
// may potentially cause another thread handing on RetryingFuture#get() call forever.
// Canceling the tasks returned by shutdownNow() also does not help, because of missing
// feature
// in guava's ListenableScheduledFuture, which does not cancel itself, when its delegate is
// feature in guava's ListenableScheduledFuture, which does not cancel itself, when its
// delegate is
// canceled.
// So only the graceful shutdown() is supported properly.
localExecutor.shutdown();
Expand Down

0 comments on commit c7d11d4

Please sign in to comment.