Skip to content

Commit

Permalink
fix #2070 Avoid cancel in toFuture#onNext on Mono source
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 11, 2020
1 parent 93e4c96 commit 6bdb0d6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4319,7 +4319,7 @@ public final Mono<Tuple2<Long, T>> timestamp(Scheduler scheduler) {
* @return a {@link CompletableFuture}
*/
public final CompletableFuture<T> toFuture() {
return subscribeWith(new MonoToCompletableFuture<>());
return subscribeWith(new MonoToCompletableFuture<>(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
final class MonoToCompletableFuture<T> extends CompletableFuture<T> implements CoreSubscriber<T> {

final AtomicReference<Subscription> ref = new AtomicReference<>();
final boolean cancelSourceOnNext;

MonoToCompletableFuture(boolean sourceCanEmitMoreThanOnce) {
this.cancelSourceOnNext = sourceCanEmitMoreThanOnce;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
Expand Down Expand Up @@ -55,7 +60,9 @@ public void onNext(T t) {
Subscription s = ref.getAndSet(null);
if (s != null) {
complete(t);
s.cancel();
if (cancelSourceOnNext) {
s.cancel();
}
}
else {
Operators.onNextDropped(t, currentContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package reactor.core.publisher;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

public class MonoToCompletableFutureTest {

Expand All @@ -31,14 +34,18 @@ public void normal() throws Exception {
assertThat(f.get()).isEqualTo(1);
}

@Test(expected = Exception.class)
public void error() throws Exception {
@Test
public void error() {
CompletableFuture<Integer> f =
Mono.<Integer>error(new Exception("test")).toFuture();
Mono.<Integer>error(new IllegalStateException("test")).toFuture();

assertThat(f.isDone()).isTrue();
assertThat(f.isCompletedExceptionally()).isTrue();
f.get();

assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(f::get)
.withCauseExactlyInstanceOf(IllegalStateException.class)
.withMessage("java.lang.IllegalStateException: test");
}

@Test
Expand All @@ -47,4 +54,28 @@ public void empty() throws Exception {

assertThat(f.get()).isNull();
}

@Test
public void monoSourceIsntCancelled() {
AtomicBoolean flag = new AtomicBoolean();

assertThat(Mono.just("value")
.doOnCancel(() -> flag.set(true))
.toFuture()
).isCompletedWithValue("value");

assertThat(flag).as("cancelled").isFalse();
}

@Test
public void sourceCanBeCancelledExplicitlyByOnNext() {
AtomicBoolean flag = new AtomicBoolean();

assertThat(Flux.just("value")
.doOnCancel(() -> flag.set(true))
.subscribeWith(new MonoToCompletableFuture<>(true))
).isCompletedWithValue("value");

assertThat(flag).as("cancelled").isTrue();
}
}

0 comments on commit 6bdb0d6

Please sign in to comment.