diff --git a/src/main/java/reactor/ipc/netty/channel/ChannelOperations.java b/src/main/java/reactor/ipc/netty/channel/ChannelOperations.java index 49c28f7765..c299244d4b 100644 --- a/src/main/java/reactor/ipc/netty/channel/ChannelOperations.java +++ b/src/main/java/reactor/ipc/netty/channel/ChannelOperations.java @@ -119,14 +119,14 @@ public static Cha } } - final BiFunction> - handler; - final Channel channel; - final FluxReceive inbound; - final DirectProcessor onInactive; - final ContextHandler context; + final BiFunction> + handler; + final Channel channel; + final FluxReceive inbound; + final DirectProcessor onInactive; + final ContextHandler context; @SuppressWarnings("unchecked") - volatile Subscription outboundSubscription; + volatile Subscription outboundSubscription; protected ChannelOperations(Channel channel, ChannelOperations replaced) { this(channel, replaced.handler, replaced.context, replaced.onInactive); @@ -146,8 +146,16 @@ protected ChannelOperations(Channel channel, this.context = Objects.requireNonNull(context, "context"); this.inbound = new FluxReceive(this); this.onInactive = processor; + Subscription[] _s = new Subscription[1]; Mono.fromDirect(context.onCloseOrRelease(channel)) + .doOnSubscribe(s -> _s[0] = s) .subscribe(onInactive); + + if(_s[0] != null) { //remove closeFuture listener ref by onCloseOrRelease + // subscription when onInactive is called for any reason from + // onHandlerTerminate + onInactive.subscribe(null, null, _s[0]::cancel); + } } @Override