Skip to content

Commit

Permalink
fix #203: Introduce flush on each with event loop option.
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 14, 2017
1 parent 8c19d46 commit 895845c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
16 changes: 15 additions & 1 deletion src/main/java/reactor/ipc/netty/NettyPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,24 @@ interface SendOptions {

/**
* Make the underlying channel flush item by item.
* Flush operation will be executed at some time in the future.
*
* @return this builder
*/
SendOptions flushOnEach();
default SendOptions flushOnEach() {
return flushOnEach(true);
}

/**
* Make the underlying channel flush item by item.
* Whether flush operation is executed immediately
* or not is specified by <code>withEventLoop</code> parameter.
*
* @param withEventLoop flag specifying whether flush operation
* will be executed immediately or at some time in the future
* @return this builder
*/
SendOptions flushOnEach(boolean withEventLoop);


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler
Queue<?> pendingWrites;
ChannelHandlerContext ctx;
boolean flushOnEach;
boolean flushOnEachWithEventLoop;

long pendingBytes;
ContextHandler<?> lastContext;
Expand All @@ -82,6 +83,7 @@ final class ChannelOperationsHandler extends ChannelDuplexHandler
volatile boolean innerActive;
volatile boolean removed;
volatile int wip;
volatile long scheduledFlush;

@SuppressWarnings("unchecked")
ChannelOperationsHandler(ContextHandler<?> contextHandler) {
Expand Down Expand Up @@ -257,8 +259,9 @@ public NettyPipeline.SendOptions flushOnBoundary() {
}

@Override
public NettyPipeline.SendOptions flushOnEach() {
public NettyPipeline.SendOptions flushOnEach(boolean withEventLoop) {
flushOnEach = true;
flushOnEachWithEventLoop = withEventLoop;
return this;
}

Expand All @@ -276,7 +279,21 @@ ChannelFuture doWrite(Object msg, ChannelPromise promise, PublisherSender inner)
.isWritable() //force flush if write buffer full
) {
pendingBytes = 0L;
return ctx.writeAndFlush(msg, promise);

ChannelFuture future = ctx.write(msg, promise);
if (flushOnEachWithEventLoop && ctx.channel().isWritable()) {
EventLoop eventLoop = ctx.channel().eventLoop();
if (eventLoop.inEventLoop()) {
scheduleFlush();
}
else {
eventLoop.execute(() -> scheduleFlush());
}
}
else {
ctx.flush();
}
return future;
}
else {
if (msg instanceof ByteBuf) {
Expand All @@ -303,6 +320,25 @@ else if (msg instanceof FileRegion) {
}
}

void scheduleFlush() {
if (SCHEDULED_FLUSH.getAndIncrement(this) == 0) {
ctx.channel()
.eventLoop()
.execute(() -> {
long missed = scheduledFlush;
for(;;) {
if (hasPendingWriteBytes()) {
ctx.flush();
}
missed = SCHEDULED_FLUSH.addAndGet(this, -missed);
if (missed == 0) {
break;
}
}
});
}
}

void discard() {
for (; ; ) {
if (pendingWrites == null || pendingWrites.isEmpty()) {
Expand Down Expand Up @@ -785,6 +821,8 @@ final void produced(long n) {
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<ChannelOperationsHandler> WIP =
AtomicIntegerFieldUpdater.newUpdater(ChannelOperationsHandler.class, "wip");
static final AtomicLongFieldUpdater<ChannelOperationsHandler> SCHEDULED_FLUSH =
AtomicLongFieldUpdater.newUpdater(ChannelOperationsHandler.class, "scheduledFlush");
static final Logger log =
Loggers.getLogger(ChannelOperationsHandler.class);

Expand Down

0 comments on commit 895845c

Please sign in to comment.