Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix testCancelGetAttempt flaky test #3592

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@
import com.google.common.base.Stopwatch;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -66,6 +70,23 @@
@ExtendWith(MockitoExtension.class)
public abstract class AbstractRetryingExecutorTest {

private static final int DEFAULT_AWAIT_TERMINATION_SEC = 10;

// This is the default executor service for RetryingExecutor tests unless
// a local executor is specifically used
ScheduledExecutorService scheduledExecutorService;

@BeforeEach
void setup() {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}

@AfterEach
void cleanup() throws InterruptedException {
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(DEFAULT_AWAIT_TERMINATION_SEC, TimeUnit.SECONDS);
}

public static Stream<Arguments> data() {
return Stream.of(Arguments.of(false), Arguments.of(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,17 @@
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

class ScheduledRetryingExecutorTest extends AbstractRetryingExecutorTest {
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

// Number of test runs, essential for multithreaded tests.
private static final int EXECUTIONS_COUNT = 5;

@Override
protected RetryingExecutorWithContext<String> getExecutor(RetryAlgorithm<String> retryAlgorithm) {
return getRetryingExecutor(retryAlgorithm, scheduler);
return getRetryingExecutor(retryAlgorithm, scheduledExecutorService);
}

@Override
Expand All @@ -78,30 +73,24 @@ private RetryingExecutorWithContext<String> getRetryingExecutor(
return new ScheduledRetryingExecutor<>(retryAlgorithm, scheduler);
}

@AfterEach
void after() {
scheduler.shutdownNow();
}

@Test
void testSuccessWithFailuresPeekAttempt() throws Exception {
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(100)
.build();
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
final int maxRetries = 100;

ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer);

RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(maxRetries)
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
Expand Down Expand Up @@ -131,28 +120,28 @@ void testSuccessWithFailuresPeekAttempt() throws Exception {
assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(failedAttempts > 0);
localExecutor.shutdownNow();
}
}

@Test
void testSuccessWithFailuresGetAttempt() throws Exception {
int maxRetries = 100;
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(maxRetries)
.build();
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
final int maxRetries = 100;

ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer);
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setMaxAttempts(maxRetries)
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
Expand Down Expand Up @@ -185,75 +174,64 @@ void testSuccessWithFailuresGetAttempt() throws Exception {
assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(checks > 1 && checks <= maxRetries, "checks is equal to " + checks);
localExecutor.shutdownNow();
}
}

@ParameterizedTest
@MethodSource("data")
void testCancelGetAttempt(boolean withCustomRetrySettings) throws Exception {
setUp(withCustomRetrySettings);
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit why we are removing this loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this even needs a loop to begin with. IIUC, the test should care that the future was cancelled after the first attempt. Running multiple invocations doesn't seem to be necessary.

ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
final int maxRetries = 20;

FailingCallable callable = new FailingCallable(maxRetries - 1, "request", "SUCCESS", tracer);
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(5000L))
.setMaxAttempts(maxRetries)
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
assertSame(future.getAttemptResult(), future.getAttemptResult());
assertFalse(future.getAttemptResult().isDone());
assertFalse(future.getAttemptResult().isCancelled());

future.setAttemptFuture(executor.submit(future));

CustomException exception;
CancellationException cancellationException = null;
int checks = 0;
int failedCancelations = 0;
do {
exception = null;
checks++;
Future<String> attemptResult = future.getAttemptResult();
try {
attemptResult.get();
assertNotNull(future.peekAttemptResult());
} catch (CancellationException e) {
cancellationException = e;
} catch (ExecutionException e) {
exception = (CustomException) e.getCause();
}
assertTrue(attemptResult.isDone());
if (!future.cancel(true)) {
failedCancelations++;
}
} while (exception != null && checks < maxRetries);

assertTrue(future.isDone());
assertNotNull(cancellationException);
// future.cancel(true) may return false sometimes, which is ok. Also, the every cancellation
// of
// an already cancelled future should return false (this is what -1 means here)
assertEquals(2, checks - (failedCancelations - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the failing point for #2851? Do you think it is redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you're referring to. The test is to ensure that the future was cancelled during the retry attempt.

Future should be reported as done and that one attempt was made.

assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertFutureCancel(future);
localExecutor.shutdownNow();
}
@Test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove parameterized tests since it just chooses different retry settings. Only need to run this for a single set of retry settings.

void testCancelGetAttempt() throws Exception {
int maxRetries = 100;
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setInitialRpcTimeoutDuration(java.time.Duration.ofMillis(50L))
.setMaxRpcTimeoutDuration(java.time.Duration.ofMillis(50L))
Comment on lines +186 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are explicitly set instead of defaults to have quicker cancellations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these were increased to give additional buffer time. IIRC, the original values were 8ms.

.setTotalTimeoutDuration(java.time.Duration.ofMillis(5000L))
.setMaxAttempts(maxRetries)
.build();
FailingCallable callable = new FailingCallable(maxRetries - 1, "request", "SUCCESS", tracer);

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
assertSame(future.getAttemptResult(), future.getAttemptResult());
assertFalse(future.getAttemptResult().isDone());
assertFalse(future.getAttemptResult().isCancelled());

future.setAttemptFuture(executor.submit(future));

CustomException exception;
CancellationException cancellationException = null;
int checks = 0;
do {
exception = null;
checks++;
Future<String> attemptResult = future.getAttemptResult();
try {
attemptResult.get();
assertNotNull(future.peekAttemptResult());
} catch (CancellationException e) {
cancellationException = e;
} catch (ExecutionException e) {
exception = (CustomException) e.getCause();
}
future.cancel(true);
assertTrue(attemptResult.isDone());
} while (exception != null && checks < maxRetries);

assertTrue(future.isDone());
assertNotNull(cancellationException);
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertFutureCancel(future);
}

@Test
void testCancelOuterFutureAfterStart() throws Exception {
ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
Expand All @@ -278,9 +256,11 @@ void testCancelOuterFutureAfterStart() throws Exception {
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
FailingCallable callable = new FailingCallable(4, "request", "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

Expand All @@ -301,27 +281,29 @@ void testCancelOuterFutureAfterStart() throws Exception {
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertTrue(future.getAttemptSettings().getAttemptCount() < 4);
}
localExecutor.shutdown();
localExecutor.awaitTermination(10, TimeUnit.SECONDS);
}

@Test
void testCancelProxiedFutureAfterStart() throws Exception {
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setInitialRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setMaxRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setTotalTimeoutDuration(java.time.Duration.ofMillis(10_0000L))
.build();
// this is a heavy test, which takes a lot of time, so only few executions.
for (int executionsCount = 0; executionsCount < 2; executionsCount++) {
// Use a test local executor for this test case due to the reasons listed below
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean above?

Suggested change
// Use a test local executor for this test case due to the reasons listed below
// Use a test local executor for this test case due to the reasons listed above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, is to reference the comments below in L312-316. I kept this the same since I don't fully understand the impacts of changing it.

ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
FailingCallable callable = new FailingCallable(5, "request", "SUCCESS", tracer);
RetrySettings retrySettings =
FAST_RETRY_SETTINGS
.toBuilder()
.setInitialRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setMaxRetryDelayDuration(java.time.Duration.ofMillis(1_000L))
.setTotalTimeoutDuration(java.time.Duration.ofMillis(10_0000L))
.build();

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor);
RetryingFuture<String> future =
executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer));
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

Expand All @@ -330,8 +312,8 @@ void testCancelProxiedFutureAfterStart() throws Exception {
// Note that shutdownNow() will not cancel internal FutureTasks automatically, which
// may potentially cause another thread handing on RetryingFuture#get() call forever.
// Canceling the tasks returned by shutdownNow() also does not help, because of missing
// feature
// in guava's ListenableScheduledFuture, which does not cancel itself, when its delegate is
// feature in guava's ListenableScheduledFuture, which does not cancel itself, when its
// delegate is
// canceled.
// So only the graceful shutdown() is supported properly.
localExecutor.shutdown();
Expand Down
Loading