diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 7e74853ad0..a0ae691c0d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -4319,7 +4319,7 @@ public final Mono> timestamp(Scheduler scheduler) { * @return a {@link CompletableFuture} */ public final CompletableFuture toFuture() { - return subscribeWith(new MonoToCompletableFuture<>()); + return subscribeWith(new MonoToCompletableFuture<>(false)); } /** diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoToCompletableFuture.java b/reactor-core/src/main/java/reactor/core/publisher/MonoToCompletableFuture.java index 0c35a57721..7fc6d1ac1a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoToCompletableFuture.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoToCompletableFuture.java @@ -27,6 +27,11 @@ final class MonoToCompletableFuture extends CompletableFuture implements CoreSubscriber { final AtomicReference ref = new AtomicReference<>(); + final boolean cancelSourceOnNext; + + MonoToCompletableFuture(boolean sourceCanEmitMoreThanOnce) { + this.cancelSourceOnNext = sourceCanEmitMoreThanOnce; + } @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -55,7 +60,9 @@ public void onNext(T t) { Subscription s = ref.getAndSet(null); if (s != null) { complete(t); - s.cancel(); + if (cancelSourceOnNext) { + s.cancel(); + } } else { Operators.onNextDropped(t, currentContext()); diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoToCompletableFutureTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoToCompletableFutureTest.java index eba73efcc1..88d95a72f7 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoToCompletableFutureTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoToCompletableFutureTest.java @@ -16,10 +16,13 @@ package reactor.core.publisher; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; public class MonoToCompletableFutureTest { @@ -31,14 +34,18 @@ public void normal() throws Exception { assertThat(f.get()).isEqualTo(1); } - @Test(expected = Exception.class) - public void error() throws Exception { + @Test + public void error() { CompletableFuture f = - Mono.error(new Exception("test")).toFuture(); + Mono.error(new IllegalStateException("test")).toFuture(); assertThat(f.isDone()).isTrue(); assertThat(f.isCompletedExceptionally()).isTrue(); - f.get(); + + assertThatExceptionOfType(ExecutionException.class) + .isThrownBy(f::get) + .withCauseExactlyInstanceOf(IllegalStateException.class) + .withMessage("java.lang.IllegalStateException: test"); } @Test @@ -47,4 +54,28 @@ public void empty() throws Exception { assertThat(f.get()).isNull(); } + + @Test + public void monoSourceIsntCancelled() { + AtomicBoolean flag = new AtomicBoolean(); + + assertThat(Mono.just("value") + .doOnCancel(() -> flag.set(true)) + .toFuture() + ).isCompletedWithValue("value"); + + assertThat(flag).as("cancelled").isFalse(); + } + + @Test + public void sourceCanBeCancelledExplicitlyByOnNext() { + AtomicBoolean flag = new AtomicBoolean(); + + assertThat(Flux.just("value") + .doOnCancel(() -> flag.set(true)) + .subscribeWith(new MonoToCompletableFuture<>(true)) + ).isCompletedWithValue("value"); + + assertThat(flag).as("cancelled").isTrue(); + } } \ No newline at end of file