Skip to content

Commit

Permalink
Fix #317 Forward user-facing cancel to send(Publisher) cancel (#335)
Browse files Browse the repository at this point in the history
Fix #317 Forward user-facing cancel to send(Publisher) cancel
Reduce instance allocations for send(Publisher) with context.
Fix possible listener leak in FutureMono for immediately cancelling subscription.

Will not not propagate cancel to future (to be deprecated anyway in Netty 5)
fix context propagation optimization
  • Loading branch information
smaldini authored Apr 25, 2018
1 parent 9275373 commit 79fe5b6
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 210 deletions.
215 changes: 204 additions & 11 deletions src/main/java/reactor/ipc/netty/FutureMono.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@
package reactor.ipc.netty;

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

Expand Down Expand Up @@ -65,7 +75,31 @@ public static <F extends Future<Void>> Mono<Void> from(F future) {
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferredFuture) {
return new DeferredFutureMono<>((context) -> deferredFuture.get());
return new DeferredFutureMono<>(deferredFuture);
}

/**
* Write the passed {@link Publisher} and return a disposable {@link Mono}.
* <p>
* In addition, current method allows interaction with downstream context, so it
* may be transferred to implicitly connected upstream
* <p>
* Example:
* <p>
* <pre><code>
* Flux&lt;String&gt; dataStream = Flux.just("a", "b", "c");
* FutureMono.deferFutureWithContext((subscriberContext) ->
* context().channel()
* .writeAndFlush(PublisherContext.withContext(dataStream, subscriberContext)));
* </code></pre>
*
* @param dataStream the publisher to write
*
* @return A {@link Mono} forwarding {@link Future} success, failure and cancel
*/
public static Mono<Void> disposableWriteAndFlush(Channel channel,
Publisher<?> dataStream) {
return new DeferredWriteMono(channel, dataStream);
}

/**
Expand All @@ -91,7 +125,36 @@ public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferr
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> deferFutureWithContext(Function<Context, F> deferredFuture) {
return new DeferredFutureMono<>(deferredFuture);
return new DeferredContextFutureMono<>(deferredFuture);
}

static <T> Publisher<T> wrapContextAndDispose(Publisher<T> publisher, ChannelFutureSubscription cfs) {
if (publisher instanceof Callable) {
return publisher;
}
else if (publisher instanceof Flux) {
return new FluxOperator<T, T>((Flux<T>) publisher) {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
cfs.ioActual = actual;
source.subscribe(actual);
}
};
}
else if (publisher instanceof Mono) {
return new MonoOperator<T, T>((Mono<T>) publisher) {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
cfs.ioActual = actual;
source.subscribe(actual);
}
};
}
else {
return publisher;
}
}

final static class ImmediateFutureMono<F extends Future<Void>> extends FutureMono {
Expand All @@ -115,20 +178,56 @@ public final void subscribe(final CoreSubscriber<? super Void> s) {
}

FutureSubscription<F> fs = new FutureSubscription<>(future, s);
s.onSubscribe(fs);
future.addListener(fs);
s.onSubscribe(fs);
}
}

final static class DeferredFutureMono<F extends Future<Void>> extends FutureMono {

final Function<Context, F> deferredFuture;
final Supplier<F> deferredFuture;

DeferredFutureMono(Function<Context, F> deferredFuture) {
DeferredFutureMono(Supplier<F> deferredFuture) {
this.deferredFuture =
Objects.requireNonNull(deferredFuture, "deferredFuture");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
F f = deferredFuture.get();

if (f == null) {
Operators.error(s,
Operators.onOperatorError(new NullPointerException(
"Deferred supplied null"), s.currentContext()));
return;
}

if (f.isDone()) {
if (f.isSuccess()) {
Operators.complete(s);
}
else {
Operators.error(s, f.cause());
}
return;
}

FutureSubscription<F> fs = new FutureSubscription<>(f, s);
s.onSubscribe(fs);
f.addListener(fs);
}
}

final static class DeferredContextFutureMono<F extends Future<Void>> extends
FutureMono {

final Function<Context, F> deferredFuture;

DeferredContextFutureMono(Function<Context, F> deferredFuture) {
this.deferredFuture = Objects.requireNonNull(deferredFuture, "deferredFuture");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
F f = deferredFuture.apply(s.currentContext());
Expand All @@ -154,17 +253,16 @@ public void subscribe(CoreSubscriber<? super Void> s) {
s.onSubscribe(fs);
f.addListener(fs);
}
}


}

final static class FutureSubscription<F extends Future<Void>> implements
GenericFutureListener<F>,
Subscription {
final static class FutureSubscription<F extends Future<Void>>
implements GenericFutureListener<F>, Subscription, Supplier<Context> {

final CoreSubscriber<? super Void> s;
final F future;

final F future;
FutureSubscription(F future, CoreSubscriber<? super Void> s) {
this.s = s;
this.future = future;
Expand All @@ -175,20 +273,115 @@ public void request(long n) {
//noop
}

@Override
public Context get() {
return s.currentContext();
}

@Override
public void cancel() {
future.removeListener(this);
}

@Override
@SuppressWarnings("unchecked")
public void operationComplete(F future) throws Exception {
public void operationComplete(F future) {
if (!future.isSuccess()) {
s.onError(future.cause());
}
else {
s.onComplete();
}
}

}

final static class DeferredWriteMono extends FutureMono {

final Channel channel;
final Publisher<?> dataStream;

DeferredWriteMono(Channel channel, Publisher<?> dataStream) {
this.channel = Objects.requireNonNull(channel, "channel");
this.dataStream = Objects.requireNonNull(dataStream, "dataStream");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
ChannelFutureSubscription cfs = new ChannelFutureSubscription(channel, s);

s.onSubscribe(cfs);

channel.writeAndFlush(wrapContextAndDispose(dataStream, cfs), cfs);
}
}

final static class ChannelFutureSubscription extends DefaultChannelPromise
implements Subscription, Function<Void, Context> {

final CoreSubscriber<? super Void> actual;
CoreSubscriber<?> ioActual;

ChannelFutureSubscription(Channel channel, CoreSubscriber<? super Void> actual) {
super(channel, channel.eventLoop());
this.actual = actual;
}

@Override
public void request(long n) {
//noop
}

@Override
public Context apply(Void aVoid) {
return actual.currentContext();
}

@Override
@SuppressWarnings("unchecked")
public void cancel() {
if (!executor().inEventLoop()) {
//must defer to be sure about ioActual field (assigned on event loop)
executor().execute(this::cancel);
return;
}
CoreSubscriber<?> ioActual = this.ioActual;
this.ioActual = null;
if (ioActual instanceof Consumer) {
((Consumer<ChannelFuture>)ioActual).accept(this);
}
}

@Override
public boolean trySuccess(Void result) {
this.ioActual = null;
boolean r = super.trySuccess(result);
actual.onComplete();
return r;
}

@Override
public ChannelPromise setSuccess(Void result) {
this.ioActual = null;
super.setSuccess(result);
actual.onComplete();
return this;
}

@Override
public boolean tryFailure(Throwable cause) {
this.ioActual = null;
boolean r = super.tryFailure(cause);
actual.onError(cause);
return r;
}

@Override
public ChannelPromise setFailure(Throwable cause) {
this.ioActual = null;
super.setFailure(cause);
actual.onError(cause);
return this;
}
}
}
9 changes: 3 additions & 6 deletions src/main/java/reactor/ipc/netty/NettyOutbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ default NettyOutbound send(Publisher<? extends ByteBuf> dataStream) {
* error during write
*/
default NettyOutbound sendByteArray(Publisher<? extends byte[]> dataStream) {
return send(PublisherContext.publisherOrScalarMap(dataStream, Unpooled::wrappedBuffer));
return send(ReactorNetty.publisherOrScalarMap(dataStream, Unpooled::wrappedBuffer));
}

/**
Expand Down Expand Up @@ -290,10 +290,7 @@ default NettyOutbound sendGroups(Publisher<? extends Publisher<? extends ByteBuf
* error during write
*/
default NettyOutbound sendObject(Publisher<?> dataStream) {
return then(FutureMono.deferFutureWithContext((subscriberContext) ->
context().channel()
.writeAndFlush(PublisherContext.withContext(dataStream, subscriberContext)))
);
return then(FutureMono.disposableWriteAndFlush(context().channel(), dataStream));
}

/**
Expand Down Expand Up @@ -337,7 +334,7 @@ default NettyOutbound sendString(Publisher<? extends String> dataStream) {
*/
default NettyOutbound sendString(Publisher<? extends String> dataStream,
Charset charset) {
return sendObject(PublisherContext.publisherOrScalarMap(dataStream, s -> alloc()
return sendObject(ReactorNetty.publisherOrScalarMap(dataStream, s -> alloc()
.buffer()
.writeBytes(s.getBytes(charset))));
}
Expand Down
Loading

0 comments on commit 79fe5b6

Please sign in to comment.