Skip to content

Commit

Permalink
Merge #3090 into 1.1.17
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Mar 11, 2024
2 parents f1871f3 + 3caade4 commit 21d7af7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,20 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
Connection conn = Connection.from(ch);
if (ch.pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (ch.pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
ch.pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
if (conn.isPersistent()) {
conn.onTerminate().subscribe(null, null, () -> conn.removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
conn.addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
}
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,19 @@ else if (markSentBody()) {
}
listener().onStateChange(this, HttpClientState.REQUEST_SENT);
if (responseTimeout != null) {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (channel().pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (channel().pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
channel().pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (isPersistent()) {
onTerminate().subscribe(null, null, () -> removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
}
}
channel().read();
if (channel().parent() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -99,6 +100,7 @@
import static reactor.netty.Metrics.DATA_RECEIVED_TIME;
import static reactor.netty.Metrics.DATA_SENT;
import static reactor.netty.Metrics.DATA_SENT_TIME;
import static reactor.netty.Metrics.ERROR;
import static reactor.netty.Metrics.ERRORS;
import static reactor.netty.Metrics.HTTP_CLIENT_PREFIX;
import static reactor.netty.Metrics.HTTP_SERVER_PREFIX;
Expand Down Expand Up @@ -731,12 +733,16 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
}
else {
// make sure the client stream is closed on the server side before checking server metrics
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(1);
assertGauge(registry, SERVER_STREAMS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
Expand Down Expand Up @@ -829,13 +835,17 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
}
else {
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
Expand Down Expand Up @@ -974,6 +984,27 @@ void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception {
}
}

@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testIssue3060ConnectTimeoutException(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
customizeClientOptions(httpClient, clientCtx, clientProtocols)
.remoteAddress(() -> new InetSocketAddress("1.1.1.1", 11111))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10)
.doOnChannelInit((o, c, address) -> c.closeFuture().addListener(f -> latch.countDown()))
.post()
.uri("/1")
.send(ByteBufFlux.fromString(Mono.just("hello")))
.responseContent()
.subscribe();

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

String[] summaryTags = new String[]{REMOTE_ADDRESS, "1.1.1.1:11111", STATUS, ERROR};
assertTimer(registry, CLIENT_CONNECT_TIME, summaryTags).hasCountEqualTo(1);
}

static Stream<Arguments> combinationsIssue2956() {
return Stream.of(
// isCustomRecorder, isHttp2
Expand Down

0 comments on commit 21d7af7

Please sign in to comment.