Skip to content

Commit

Permalink
fix #186 - drop out of sequence http client frames when server close
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Oct 18, 2017
1 parent 362da95 commit 5fd4a81
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
21 changes: 19 additions & 2 deletions src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
final HttpRequest request = (HttpRequest) msg;
if (persistentConnection) {
pendingResponses += 1;
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug("Increasing pending responses, now " +
"{}", pendingResponses);
}
persistentConnection = isKeepAlive(request);
}
else {
Expand All @@ -89,7 +93,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
return;
}
if (overflow || pendingResponses > 1) {
if (pendingResponses > 1) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug("buffering pipelined HTTP request, " +
"pending response count: {}, queue: {}",
Expand All @@ -109,6 +113,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
}
else if (persistentConnection && pendingResponses == 0) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug("Dropped HTTP content, " +
"Since response has been sent already:{}", msg);
}
ReferenceCountUtil.release(msg);
ctx.read();
return;
}
else if (overflow) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug("buffering pipelined HTTP content, " +
Expand Down Expand Up @@ -165,9 +178,13 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
ctx.write(msg, promise);

if(mustRecycleEncoder) {
if(persistentConnection && mustRecycleEncoder) {
mustRecycleEncoder = false;
pendingResponses -= 1;
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug("Decreasing pending responses, now " +
"{}", pendingResponses);
}
}

if (pipelined != null && !pipelined.isEmpty()) {
Expand Down
12 changes: 8 additions & 4 deletions src/test/java/reactor/ipc/netty/http/server/HttpServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -600,17 +600,21 @@ else if (close) {
public void testIssue186() {
NettyContext server =
HttpServer.create(0)
.newHandler((req, res) -> res.sendNotFound())
.newHandler((req, res) -> res.status(200).send())
.block(Duration.ofSeconds(300));

HttpClient client =
HttpClient.create(ops -> ops.connectAddress(() -> server.address())
.poolResources(PoolResources.fixed("test", 1)));

doTestIssue186(client);
doTestIssue186(client);
try {
doTestIssue186(client);
doTestIssue186(client);
}
finally {
server.dispose();
}

server.dispose();
}

private void doTestIssue186(HttpClient client) {
Expand Down

0 comments on commit 5fd4a81

Please sign in to comment.