Skip to content

Commit

Permalink
Turn channel to auto read on http inbound termination to detect close…
Browse files Browse the repository at this point in the history
… faster

Auto retry persistent http clients if they failed to write a newly acquired channel
  • Loading branch information
Stephane Maldini committed Mar 9, 2017
1 parent 10310a3 commit 96daef2
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package reactor.ipc.netty.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/reactor/ipc/netty/http/HttpOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ protected HttpOperations(Channel ioChannel,
BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler,
ContextHandler<?> context) {
super(ioChannel, handler, context);
//reset channel to manual read if re-used
ioChannel.config().setAutoRead(false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;
import reactor.ipc.netty.channel.AbortedException;
import reactor.ipc.netty.channel.ContextHandler;
import reactor.ipc.netty.http.Cookies;
import reactor.ipc.netty.http.HttpOperations;
Expand Down Expand Up @@ -480,6 +481,15 @@ protected void onOutboundComplete() {
channel().read();
}

@Override
protected void onOutboundError(Throwable err) {
if(NettyContext.isPersistent(channel()) && responseState == null){
parentContext().fireContextError(err);
return;
}
super.onOutboundError(err);
}

@Override
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse) {
Expand Down Expand Up @@ -544,6 +554,8 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {
super.onInboundNext(ctx, msg);
}
//force auto read to enable more accurate close selection now inbound is done
channel().config().setAutoRead(true);
onHandlerTerminate();
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.channel.AbortedException;

/**
* @author Stephane Maldini
Expand Down Expand Up @@ -175,6 +176,10 @@ public boolean test(Throwable throwable) {
redirect(re.location);
return true;
}
if (AbortedException.isConnectionReset(throwable)) {
redirect(activeURI.toString());
return true;
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,7 @@

package reactor.ipc.netty.http.server;

import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
Expand All @@ -33,7 +30,6 @@
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.ByteBufFlux;

/**
* @author Stephane Maldini
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
Expand Down Expand Up @@ -364,10 +365,11 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (isOutboundDone()) {
onHandlerTerminate();
}
return;
}
if (isOutboundDone()) {
onOutboundReadMore();
else {
//force auto read to enable more accurate close selection now inbound is done
channel().config()
.setAutoRead(true);
}
}
}
else {
Expand Down Expand Up @@ -406,15 +408,6 @@ protected void onOutboundComplete() {
});
}

final void onOutboundReadMore() {
if (isKeepAlive()) {
if (log.isDebugEnabled()) {
log.debug("Consuming keep-alive connection, prepare to ignore extra " + "frames");
}
channel().read();
}
}

@Override
protected void onOutboundError(Throwable err) {

Expand All @@ -430,24 +423,20 @@ protected void onOutboundError(Throwable err) {
HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
.setInt(HttpHeaderNames.CONTENT_LENGTH, 0)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
channel().writeAndFlush(response)
.addListener(r -> onHandlerTerminate());
onOutboundReadMore();
.addListener(ChannelFutureListener.CLOSE);
return;
}

if (HttpUtil.isContentLengthSet(nettyResponse)) {
channel().writeAndFlush(EMPTY_BUFFER)
.addListener(r -> onHandlerTerminate());
onOutboundReadMore();
channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
.addListener(ChannelFutureListener.CLOSE);
return;
}
channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
.addListener(r -> onHandlerTerminate());
if (isKeepAlive()) {
onOutboundReadMore();
}
.addListener(ChannelFutureListener.CLOSE);
}

@Override
Expand Down

0 comments on commit 96daef2

Please sign in to comment.