diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java index 5e7786b1c561..a40fdeb90b72 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java @@ -158,9 +158,12 @@ private CompletableFuture> executeHttpRequest(SdkHttpFullReque // Attempt to offload the completion of the future returned from this // stage onto the future completion executor - CompletableFuture> asyncComplete = - responseHandlerFuture.whenCompleteAsync((r, t) -> completeResponseFuture(responseFuture, r, t), - futureCompletionExecutor); + CompletableFuture asyncComplete = + responseHandlerFuture.handleAsync((r, t) -> { + completeResponseFuture(responseFuture, r, t); + return null; + }, + futureCompletionExecutor); // It's possible the async execution above fails. If so, log a warning, // and just complete it synchronously. diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java index 735d33050484..df8126db2343 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java @@ -16,12 +16,14 @@ package software.amazon.awssdk.core.internal.http.pipeline.stages; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -33,6 +35,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -58,6 +61,8 @@ import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.metrics.MetricCollector; +import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.ThreadFactoryBuilder; import utils.ValidSdkObjects; @RunWith(MockitoJUnitRunner.class) @@ -157,7 +162,7 @@ public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest } @Test - public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() { + public void execute_handlerFutureCompletedNormally_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() { ExecutorService mockExecutor = mock(ExecutorService.class); doThrow(new RejectedExecutionException("Busy")).when(mockExecutor).execute(any(Runnable.class)); @@ -169,7 +174,8 @@ public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompl HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build(); TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class); - when(mockHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null)); + CompletableFuture prepareFuture = new CompletableFuture(); + when(mockHandler.prepare()).thenReturn(prepareFuture); stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies); @@ -179,10 +185,58 @@ public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompl CompletableFuture executeFuture = stage.execute(requestFuture, requestContext()); long testThreadId = Thread.currentThread().getId(); - executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId)).join(); + CompletableFuture afterWhenComplete = + executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId)); + + prepareFuture.complete(null); + + afterWhenComplete.join(); + verify(mockExecutor).execute(any(Runnable.class)); } + @Test + public void execute_handlerFutureCompletedExceptionally_doesNotAttemptSynchronousComplete() { + String threadNamePrefix = "async-handle-test"; + ExecutorService mockExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().threadNamePrefix(threadNamePrefix).build()); + + SdkClientConfiguration config = + SdkClientConfiguration.builder() + .option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, mockExecutor) + .option(ASYNC_HTTP_CLIENT, sdkAsyncHttpClient) + .build(); + HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build(); + + TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class); + CompletableFuture prepareFuture = spy(new CompletableFuture()); + when(mockHandler.prepare()).thenReturn(prepareFuture); + + stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies); + + CompletableFuture requestFuture = CompletableFuture.completedFuture( + ValidSdkObjects.sdkHttpFullRequest().build()); + + CompletableFuture executeFuture = stage.execute(requestFuture, requestContext()); + + try { + CompletableFuture afterHandle = + executeFuture.handle((r, t) -> assertThat(Thread.currentThread().getName()).startsWith(threadNamePrefix)); + + prepareFuture.completeExceptionally(new RuntimeException("parse error")); + + afterHandle.join(); + + assertThatThrownBy(executeFuture::join) + .hasCauseInstanceOf(RuntimeException.class) + .hasMessageContaining("parse error"); + + verify(prepareFuture, times(0)).whenComplete(any()); + } finally { + mockExecutor.shutdown(); + } + } + private HttpClientDependencies clientDependencies(Duration timeout) { SdkClientConfiguration configuration = SdkClientConfiguration.builder() .option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, Runnable::run)