Skip to content

Commit

Permalink
rework #176 with a simpler iteration until further API refining
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Oct 18, 2017
1 parent 5fd4a81 commit 21fd7a9
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/main/java/reactor/ipc/netty/channel/ChannelOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ public static <INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> Cha
}
}

final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>>
handler;
final Channel channel;
final FluxReceive inbound;
final DirectProcessor<Void> onInactive;
final ContextHandler<?> context;
final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>>
handler;
final Channel channel;
final FluxReceive inbound;
final DirectProcessor<Void> onInactive;
final ContextHandler<?> context;
@SuppressWarnings("unchecked")
volatile Subscription outboundSubscription;
volatile Subscription outboundSubscription;
protected ChannelOperations(Channel channel,
ChannelOperations<INBOUND, OUTBOUND> replaced) {
this(channel, replaced.handler, replaced.context, replaced.onInactive);
Expand All @@ -146,8 +146,16 @@ protected ChannelOperations(Channel channel,
this.context = Objects.requireNonNull(context, "context");
this.inbound = new FluxReceive(this);
this.onInactive = processor;
Subscription[] _s = new Subscription[1];
Mono.fromDirect(context.onCloseOrRelease(channel))
.doOnSubscribe(s -> _s[0] = s)
.subscribe(onInactive);

if(_s[0] != null) { //remove closeFuture listener ref by onCloseOrRelease
// subscription when onInactive is called for any reason from
// onHandlerTerminate
onInactive.subscribe(null, null, _s[0]::cancel);
}
}

@Override
Expand Down

0 comments on commit 21fd7a9

Please sign in to comment.