From 895845ce4013339dc34e7c8f4f6eb922d95b73d2 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 31 Oct 2017 20:25:04 +0200 Subject: [PATCH] fix #203: Introduce flush on each with event loop option. --- .../java/reactor/ipc/netty/NettyPipeline.java | 16 ++++++- .../channel/ChannelOperationsHandler.java | 42 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/NettyPipeline.java b/src/main/java/reactor/ipc/netty/NettyPipeline.java index caae591406..6efc2fe225 100644 --- a/src/main/java/reactor/ipc/netty/NettyPipeline.java +++ b/src/main/java/reactor/ipc/netty/NettyPipeline.java @@ -83,10 +83,24 @@ interface SendOptions { /** * Make the underlying channel flush item by item. + * Flush operation will be executed at some time in the future. * * @return this builder */ - SendOptions flushOnEach(); + default SendOptions flushOnEach() { + return flushOnEach(true); + } + + /** + * Make the underlying channel flush item by item. + * Whether flush operation is executed immediately + * or not is specified by withEventLoop parameter. + * + * @param withEventLoop flag specifying whether flush operation + * will be executed immediately or at some time in the future + * @return this builder + */ + SendOptions flushOnEach(boolean withEventLoop); } diff --git a/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java b/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java index 1c75a675ad..f70f8a25d3 100644 --- a/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java +++ b/src/main/java/reactor/ipc/netty/channel/ChannelOperationsHandler.java @@ -73,6 +73,7 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler Queue pendingWrites; ChannelHandlerContext ctx; boolean flushOnEach; + boolean flushOnEachWithEventLoop; long pendingBytes; ContextHandler lastContext; @@ -82,6 +83,7 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler volatile boolean innerActive; volatile boolean removed; volatile int wip; + volatile long scheduledFlush; @SuppressWarnings("unchecked") ChannelOperationsHandler(ContextHandler contextHandler) { @@ -257,8 +259,9 @@ public NettyPipeline.SendOptions flushOnBoundary() { } @Override - public NettyPipeline.SendOptions flushOnEach() { + public NettyPipeline.SendOptions flushOnEach(boolean withEventLoop) { flushOnEach = true; + flushOnEachWithEventLoop = withEventLoop; return this; } @@ -276,7 +279,21 @@ ChannelFuture doWrite(Object msg, ChannelPromise promise, PublisherSender inner) .isWritable() //force flush if write buffer full ) { pendingBytes = 0L; - return ctx.writeAndFlush(msg, promise); + + ChannelFuture future = ctx.write(msg, promise); + if (flushOnEachWithEventLoop && ctx.channel().isWritable()) { + EventLoop eventLoop = ctx.channel().eventLoop(); + if (eventLoop.inEventLoop()) { + scheduleFlush(); + } + else { + eventLoop.execute(() -> scheduleFlush()); + } + } + else { + ctx.flush(); + } + return future; } else { if (msg instanceof ByteBuf) { @@ -303,6 +320,25 @@ else if (msg instanceof FileRegion) { } } + void scheduleFlush() { + if (SCHEDULED_FLUSH.getAndIncrement(this) == 0) { + ctx.channel() + .eventLoop() + .execute(() -> { + long missed = scheduledFlush; + for(;;) { + if (hasPendingWriteBytes()) { + ctx.flush(); + } + missed = SCHEDULED_FLUSH.addAndGet(this, -missed); + if (missed == 0) { + break; + } + } + }); + } + } + void discard() { for (; ; ) { if (pendingWrites == null || pendingWrites.isEmpty()) { @@ -785,6 +821,8 @@ final void produced(long n) { @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(ChannelOperationsHandler.class, "wip"); + static final AtomicLongFieldUpdater SCHEDULED_FLUSH = + AtomicLongFieldUpdater.newUpdater(ChannelOperationsHandler.class, "scheduledFlush"); static final Logger log = Loggers.getLogger(ChannelOperationsHandler.class);