Skip to content

Commit

Permalink
Fix #317 Forward user-facing cancel to send(Publisher) cancel
Browse files Browse the repository at this point in the history
Reduce instance allocations for send(Publisher)
  • Loading branch information
Stephane Maldini committed Apr 25, 2018
1 parent 9275373 commit 4f1ff19
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 174 deletions.
212 changes: 201 additions & 11 deletions src/main/java/reactor/ipc/netty/FutureMono.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@
package reactor.ipc.netty;

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

Expand Down Expand Up @@ -65,7 +75,31 @@ public static <F extends Future<Void>> Mono<Void> from(F future) {
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferredFuture) {
return new DeferredFutureMono<>((context) -> deferredFuture.get());
return new DeferredFutureMono<>(deferredFuture);
}

/**
* Write the passed {@link Publisher} and return a disposable {@link Mono}.
* <p>
* In addition, current method allows interaction with downstream context, so it
* may be transferred to implicitly connected upstream
* <p>
* Example:
* <p>
* <pre><code>
* Flux&lt;String&gt; dataStream = Flux.just("a", "b", "c");
* FutureMono.deferFutureWithContext((subscriberContext) ->
* context().channel()
* .writeAndFlush(PublisherContext.withContext(dataStream, subscriberContext)));
* </code></pre>
*
* @param dataStream the publisher to write
*
* @return A {@link Mono} forwarding {@link Future} success, failure and cancel
*/
public static Mono<Void> disposableWriteAndFlush(Channel channel,
Publisher<?> dataStream) {
return new DeferredWriteMono(channel, dataStream);
}

/**
Expand All @@ -91,7 +125,36 @@ public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferr
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> deferFutureWithContext(Function<Context, F> deferredFuture) {
return new DeferredFutureMono<>(deferredFuture);
return new DeferredContextFutureMono<>(deferredFuture);
}

static <T> Publisher<T> wrapContextAndDispose(Publisher<T> publisher, ChannelFutureSubscription cfs) {
if (publisher instanceof Callable) {
return publisher;
}
else if (publisher instanceof Flux) {
return new FluxOperator<T, T>((Flux<T>) publisher) {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
cfs.ioActual = actual;
source.subscribe(actual);
}
};
}
else if (publisher instanceof Mono) {
return new MonoOperator<T, T>((Mono<T>) publisher) {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
cfs.ioActual = actual;
source.subscribe(actual);
}
};
}
else {
return publisher;
}
}

final static class ImmediateFutureMono<F extends Future<Void>> extends FutureMono {
Expand All @@ -115,20 +178,56 @@ public final void subscribe(final CoreSubscriber<? super Void> s) {
}

FutureSubscription<F> fs = new FutureSubscription<>(future, s);
s.onSubscribe(fs);
future.addListener(fs);
s.onSubscribe(fs);
}
}

final static class DeferredFutureMono<F extends Future<Void>> extends FutureMono {

final Function<Context, F> deferredFuture;
final Supplier<F> deferredFuture;

DeferredFutureMono(Function<Context, F> deferredFuture) {
DeferredFutureMono(Supplier<F> deferredFuture) {
this.deferredFuture =
Objects.requireNonNull(deferredFuture, "deferredFuture");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
F f = deferredFuture.get();

if (f == null) {
Operators.error(s,
Operators.onOperatorError(new NullPointerException(
"Deferred supplied null"), s.currentContext()));
return;
}

if (f.isDone()) {
if (f.isSuccess()) {
Operators.complete(s);
}
else {
Operators.error(s, f.cause());
}
return;
}

FutureSubscription<F> fs = new FutureSubscription<>(f, s);
s.onSubscribe(fs);
f.addListener(fs);
}
}

final static class DeferredContextFutureMono<F extends Future<Void>> extends
FutureMono {

final Function<Context, F> deferredFuture;

DeferredContextFutureMono(Function<Context, F> deferredFuture) {
this.deferredFuture = Objects.requireNonNull(deferredFuture, "deferredFuture");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
F f = deferredFuture.apply(s.currentContext());
Expand All @@ -154,17 +253,16 @@ public void subscribe(CoreSubscriber<? super Void> s) {
s.onSubscribe(fs);
f.addListener(fs);
}
}


}

final static class FutureSubscription<F extends Future<Void>> implements
GenericFutureListener<F>,
Subscription {
final static class FutureSubscription<F extends Future<Void>>
implements GenericFutureListener<F>, Subscription, Supplier<Context> {

final CoreSubscriber<? super Void> s;
final F future;

final F future;
FutureSubscription(F future, CoreSubscriber<? super Void> s) {
this.s = s;
this.future = future;
Expand All @@ -175,20 +273,112 @@ public void request(long n) {
//noop
}

@Override
public Context get() {
return s.currentContext();
}

@Override
public void cancel() {
future.removeListener(this);
future.cancel(true);
}

@Override
@SuppressWarnings("unchecked")
public void operationComplete(F future) throws Exception {
public void operationComplete(F future) {
if (!future.isSuccess()) {
s.onError(future.cause());
}
else {
s.onComplete();
}
}

}

final static class DeferredWriteMono extends FutureMono {

final Channel channel;
final Publisher<?> dataStream;

DeferredWriteMono(Channel channel, Publisher<?> dataStream) {
this.channel = Objects.requireNonNull(channel, "channel");
this.dataStream = Objects.requireNonNull(dataStream, "dataStream");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
ChannelFutureSubscription cfs = new ChannelFutureSubscription(channel, s);

s.onSubscribe(cfs);

channel.writeAndFlush(wrapContextAndDispose(dataStream, cfs), cfs);
}
}

final static class ChannelFutureSubscription extends DefaultChannelPromise
implements Subscription {

final CoreSubscriber<? super Void> actual;
CoreSubscriber<?> ioActual;

ChannelFutureSubscription(Channel channel, CoreSubscriber<? super Void> actual) {
super(channel, channel.eventLoop());
this.actual = actual;
}

@Override
public void request(long n) {
//noop
}

@Override
@SuppressWarnings("unchecked")
public void cancel() {
if (!executor().inEventLoop()) {
//must defer to be sure about ioActual field (assigned on event loop)
executor().execute(this::cancel);
return;
}
CoreSubscriber<?> ioActual = this.ioActual;
this.ioActual = null;
if (ioActual instanceof Consumer) {
((Consumer<ChannelFuture>)ioActual).accept(this);
}
cancel(true);
}

@Override
public boolean trySuccess(Void result) {
this.ioActual = null;
boolean r = super.trySuccess(result);
actual.onComplete();
return r;
}

@Override
public ChannelPromise setSuccess(Void result) {
this.ioActual = null;
super.setSuccess(result);
actual.onComplete();
return this;
}

@Override
public boolean tryFailure(Throwable cause) {
this.ioActual = null;
boolean r = super.tryFailure(cause);
actual.onError(cause);
return r;
}

@Override
public ChannelPromise setFailure(Throwable cause) {
this.ioActual = null;
super.setFailure(cause);
actual.onError(cause);
return this;
}
}
}
9 changes: 3 additions & 6 deletions src/main/java/reactor/ipc/netty/NettyOutbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ default NettyOutbound send(Publisher<? extends ByteBuf> dataStream) {
* error during write
*/
default NettyOutbound sendByteArray(Publisher<? extends byte[]> dataStream) {
return send(PublisherContext.publisherOrScalarMap(dataStream, Unpooled::wrappedBuffer));
return send(ReactorNetty.publisherOrScalarMap(dataStream, Unpooled::wrappedBuffer));
}

/**
Expand Down Expand Up @@ -290,10 +290,7 @@ default NettyOutbound sendGroups(Publisher<? extends Publisher<? extends ByteBuf
* error during write
*/
default NettyOutbound sendObject(Publisher<?> dataStream) {
return then(FutureMono.deferFutureWithContext((subscriberContext) ->
context().channel()
.writeAndFlush(PublisherContext.withContext(dataStream, subscriberContext)))
);
return then(FutureMono.disposableWriteAndFlush(context().channel(), dataStream));
}

/**
Expand Down Expand Up @@ -337,7 +334,7 @@ default NettyOutbound sendString(Publisher<? extends String> dataStream) {
*/
default NettyOutbound sendString(Publisher<? extends String> dataStream,
Charset charset) {
return sendObject(PublisherContext.publisherOrScalarMap(dataStream, s -> alloc()
return sendObject(ReactorNetty.publisherOrScalarMap(dataStream, s -> alloc()
.buffer()
.writeBytes(s.getBytes(charset))));
}
Expand Down
Loading

0 comments on commit 4f1ff19

Please sign in to comment.