diff --git a/gax-java/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java b/gax-java/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java index 5b8599e1e8..bc619346ef 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java @@ -52,10 +52,14 @@ import com.google.common.base.Stopwatch; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -66,6 +70,23 @@ @ExtendWith(MockitoExtension.class) public abstract class AbstractRetryingExecutorTest { + private static final int DEFAULT_AWAIT_TERMINATION_SEC = 10; + + // This is the default executor service for RetryingExecutor tests unless + // a local executor is specifically used + ScheduledExecutorService scheduledExecutorService; + + @BeforeEach + void setup() { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + + @AfterEach + void cleanup() throws InterruptedException { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService.awaitTermination(DEFAULT_AWAIT_TERMINATION_SEC, TimeUnit.SECONDS); + } + public static Stream data() { return Stream.of(Arguments.of(false), Arguments.of(true)); } diff --git a/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java index 5a65895ef0..203475602c 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -47,22 +47,17 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; class ScheduledRetryingExecutorTest extends AbstractRetryingExecutorTest { - private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); // Number of test runs, essential for multithreaded tests. private static final int EXECUTIONS_COUNT = 5; @Override protected RetryingExecutorWithContext getExecutor(RetryAlgorithm retryAlgorithm) { - return getRetryingExecutor(retryAlgorithm, scheduler); + return getRetryingExecutor(retryAlgorithm, scheduledExecutorService); } @Override @@ -78,30 +73,24 @@ private RetryingExecutorWithContext getRetryingExecutor( return new ScheduledRetryingExecutor<>(retryAlgorithm, scheduler); } - @AfterEach - void after() { - scheduler.shutdownNow(); - } - @Test void testSuccessWithFailuresPeekAttempt() throws Exception { + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L)) + .setMaxAttempts(100) + .build(); for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) { - final int maxRetries = 100; - ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer); - RetrySettings retrySettings = - FAST_RETRY_SETTINGS - .toBuilder() - .setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L)) - .setMaxAttempts(maxRetries) - .build(); - RetryingExecutorWithContext executor = - getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor); + getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService); RetryingFuture future = - executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer)); + executor.createFuture( + callable, + FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings)); callable.setExternalFuture(future); assertNull(future.peekAttemptResult()); @@ -131,28 +120,28 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { assertFutureSuccess(future); assertEquals(15, future.getAttemptSettings().getAttemptCount()); assertTrue(failedAttempts > 0); - localExecutor.shutdownNow(); } } @Test void testSuccessWithFailuresGetAttempt() throws Exception { + int maxRetries = 100; + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L)) + .setMaxAttempts(maxRetries) + .build(); for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) { - final int maxRetries = 100; - ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer); - RetrySettings retrySettings = - FAST_RETRY_SETTINGS - .toBuilder() - .setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L)) - .setMaxAttempts(maxRetries) - .build(); RetryingExecutorWithContext executor = - getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor); + getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService); RetryingFuture future = - executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer)); + executor.createFuture( + callable, + FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings)); callable.setExternalFuture(future); assertNull(future.peekAttemptResult()); @@ -185,75 +174,64 @@ void testSuccessWithFailuresGetAttempt() throws Exception { assertFutureSuccess(future); assertEquals(15, future.getAttemptSettings().getAttemptCount()); assertTrue(checks > 1 && checks <= maxRetries, "checks is equal to " + checks); - localExecutor.shutdownNow(); } } - @ParameterizedTest - @MethodSource("data") - void testCancelGetAttempt(boolean withCustomRetrySettings) throws Exception { - setUp(withCustomRetrySettings); - for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) { - ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); - final int maxRetries = 20; - - FailingCallable callable = new FailingCallable(maxRetries - 1, "request", "SUCCESS", tracer); - RetrySettings retrySettings = - FAST_RETRY_SETTINGS - .toBuilder() - .setTotalTimeoutDuration(java.time.Duration.ofMillis(5000L)) - .setMaxAttempts(maxRetries) - .build(); - - RetryingExecutorWithContext executor = - getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor); - RetryingFuture future = executor.createFuture(callable, retryingContext); - callable.setExternalFuture(future); - - assertNull(future.peekAttemptResult()); - assertSame(future.getAttemptResult(), future.getAttemptResult()); - assertFalse(future.getAttemptResult().isDone()); - assertFalse(future.getAttemptResult().isCancelled()); - - future.setAttemptFuture(executor.submit(future)); - - CustomException exception; - CancellationException cancellationException = null; - int checks = 0; - int failedCancelations = 0; - do { - exception = null; - checks++; - Future attemptResult = future.getAttemptResult(); - try { - attemptResult.get(); - assertNotNull(future.peekAttemptResult()); - } catch (CancellationException e) { - cancellationException = e; - } catch (ExecutionException e) { - exception = (CustomException) e.getCause(); - } - assertTrue(attemptResult.isDone()); - if (!future.cancel(true)) { - failedCancelations++; - } - } while (exception != null && checks < maxRetries); - - assertTrue(future.isDone()); - assertNotNull(cancellationException); - // future.cancel(true) may return false sometimes, which is ok. Also, the every cancellation - // of - // an already cancelled future should return false (this is what -1 means here) - assertEquals(2, checks - (failedCancelations - 1)); - assertTrue(future.getAttemptSettings().getAttemptCount() > 0); - assertFutureCancel(future); - localExecutor.shutdownNow(); - } + @Test + void testCancelGetAttempt() throws Exception { + int maxRetries = 100; + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRpcTimeoutDuration(java.time.Duration.ofMillis(50L)) + .setMaxRpcTimeoutDuration(java.time.Duration.ofMillis(50L)) + .setTotalTimeoutDuration(java.time.Duration.ofMillis(5000L)) + .setMaxAttempts(maxRetries) + .build(); + FailingCallable callable = new FailingCallable(maxRetries - 1, "request", "SUCCESS", tracer); + + RetryingExecutorWithContext executor = + getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService); + RetryingFuture future = + executor.createFuture( + callable, + FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings)); + callable.setExternalFuture(future); + + assertNull(future.peekAttemptResult()); + assertSame(future.getAttemptResult(), future.getAttemptResult()); + assertFalse(future.getAttemptResult().isDone()); + assertFalse(future.getAttemptResult().isCancelled()); + + future.setAttemptFuture(executor.submit(future)); + + CustomException exception; + CancellationException cancellationException = null; + int checks = 0; + do { + exception = null; + checks++; + Future attemptResult = future.getAttemptResult(); + try { + attemptResult.get(); + assertNotNull(future.peekAttemptResult()); + } catch (CancellationException e) { + cancellationException = e; + } catch (ExecutionException e) { + exception = (CustomException) e.getCause(); + } + future.cancel(true); + assertTrue(attemptResult.isDone()); + } while (exception != null && checks < maxRetries); + + assertTrue(future.isDone()); + assertNotNull(cancellationException); + assertTrue(future.getAttemptSettings().getAttemptCount() > 0); + assertFutureCancel(future); } @Test void testCancelOuterFutureAfterStart() throws Exception { - ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); RetrySettings retrySettings = FAST_RETRY_SETTINGS .toBuilder() @@ -278,9 +256,11 @@ void testCancelOuterFutureAfterStart() throws Exception { for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) { FailingCallable callable = new FailingCallable(4, "request", "SUCCESS", tracer); RetryingExecutorWithContext executor = - getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor); + getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService); RetryingFuture future = - executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer)); + executor.createFuture( + callable, + FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings)); callable.setExternalFuture(future); future.setAttemptFuture(executor.submit(future)); @@ -301,27 +281,29 @@ void testCancelOuterFutureAfterStart() throws Exception { assertTrue(future.getAttemptSettings().getAttemptCount() > 0); assertTrue(future.getAttemptSettings().getAttemptCount() < 4); } - localExecutor.shutdown(); - localExecutor.awaitTermination(10, TimeUnit.SECONDS); } @Test void testCancelProxiedFutureAfterStart() throws Exception { + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelayDuration(java.time.Duration.ofMillis(1_000L)) + .setMaxRetryDelayDuration(java.time.Duration.ofMillis(1_000L)) + .setTotalTimeoutDuration(java.time.Duration.ofMillis(10_0000L)) + .build(); // this is a heavy test, which takes a lot of time, so only few executions. for (int executionsCount = 0; executionsCount < 2; executionsCount++) { + // Use a test local executor for this test case due to the reasons listed below ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); FailingCallable callable = new FailingCallable(5, "request", "SUCCESS", tracer); - RetrySettings retrySettings = - FAST_RETRY_SETTINGS - .toBuilder() - .setInitialRetryDelayDuration(java.time.Duration.ofMillis(1_000L)) - .setMaxRetryDelayDuration(java.time.Duration.ofMillis(1_000L)) - .setTotalTimeoutDuration(java.time.Duration.ofMillis(10_0000L)) - .build(); + RetryingExecutorWithContext executor = getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor); RetryingFuture future = - executor.createFuture(callable, FakeCallContext.createDefault().withTracer(tracer)); + executor.createFuture( + callable, + FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings)); callable.setExternalFuture(future); future.setAttemptFuture(executor.submit(future)); @@ -330,8 +312,8 @@ void testCancelProxiedFutureAfterStart() throws Exception { // Note that shutdownNow() will not cancel internal FutureTasks automatically, which // may potentially cause another thread handing on RetryingFuture#get() call forever. // Canceling the tasks returned by shutdownNow() also does not help, because of missing - // feature - // in guava's ListenableScheduledFuture, which does not cancel itself, when its delegate is + // feature in guava's ListenableScheduledFuture, which does not cancel itself, when its + // delegate is // canceled. // So only the graceful shutdown() is supported properly. localExecutor.shutdown();