From 4f9acc0e05a20cffe03967922fd508464474442a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 31 May 2023 15:03:47 +0300 Subject: [PATCH] Add API for specifying max streams per connection (#2817) The connection is marked for closing once the number of all-time streams reaches max streams configuration. Fixes #2769 --- reactor-netty-http/build.gradle | 1 + .../reactor/netty/http/Http2SettingsSpec.java | 41 +++++++- .../netty/http/server/HttpServerConfig.java | 96 ++++++++++++++----- .../netty/http/Http2SettingsSpecTests.java | 51 ++++++++++ .../java/reactor/netty/http/Http2Tests.java | 69 +++++++++++++ 5 files changed, 232 insertions(+), 26 deletions(-) diff --git a/reactor-netty-http/build.gradle b/reactor-netty-http/build.gradle index 8d6c992c2b..4502693fed 100644 --- a/reactor-netty-http/build.gradle +++ b/reactor-netty-http/build.gradle @@ -235,6 +235,7 @@ task japicmp(type: JapicmpTask) { compatibilityChangeExcludes = [ "METHOD_NEW_DEFAULT" ] methodExcludes = [ + 'reactor.netty.http.Http2SettingsSpec$Builder#maxStreams(long)' ] } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java index 5c97f43207..977a2256c4 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java @@ -78,6 +78,14 @@ public interface Builder { */ Builder maxHeaderListSize(long maxHeaderListSize); + /** + * The connection is marked for closing once the number of all-time streams reaches {@code maxStreams}. + * + * @return {@code this} + * @since 1.0.33 + */ + Builder maxStreams(long maxStreams); + /** * Sets the {@code SETTINGS_ENABLE_PUSH} value. * @@ -147,6 +155,17 @@ public Long maxHeaderListSize() { return maxHeaderListSize; } + /** + * Returns the configured {@code maxStreams} value or null. + * + * @return the configured {@code maxStreams} value or null + * @since 1.0.33 + */ + @Nullable + public Long maxStreams() { + return maxStreams; + } + /** * Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null. * @@ -171,6 +190,7 @@ public boolean equals(Object o) { Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) && Objects.equals(maxFrameSize, that.maxFrameSize) && maxHeaderListSize.equals(that.maxHeaderListSize) && + Objects.equals(maxStreams, that.maxStreams) && Objects.equals(pushEnabled, that.pushEnabled); } @@ -182,6 +202,7 @@ public int hashCode() { result = 31 * result + Long.hashCode(maxConcurrentStreams); result = 31 * result + maxFrameSize; result = 31 * result + Long.hashCode(maxHeaderListSize); + result = 31 * result + Long.hashCode(maxStreams); result = 31 * result + Boolean.hashCode(pushEnabled); return result; } @@ -191,19 +212,28 @@ public int hashCode() { final Long maxConcurrentStreams; final Integer maxFrameSize; final Long maxHeaderListSize; + final Long maxStreams; final Boolean pushEnabled; Http2SettingsSpec(Build build) { Http2Settings settings = build.http2Settings; headerTableSize = settings.headerTableSize(); initialWindowSize = settings.initialWindowSize(); - maxConcurrentStreams = settings.maxConcurrentStreams(); + if (settings.maxConcurrentStreams() != null) { + maxConcurrentStreams = build.maxStreams != null ? + Math.min(settings.maxConcurrentStreams(), build.maxStreams) : settings.maxConcurrentStreams(); + } + else { + maxConcurrentStreams = build.maxStreams; + } maxFrameSize = settings.maxFrameSize(); maxHeaderListSize = settings.maxHeaderListSize(); + maxStreams = build.maxStreams; pushEnabled = settings.pushEnabled(); } static final class Build implements Builder { + Long maxStreams; final Http2Settings http2Settings = Http2Settings.defaultSettings(); @Override @@ -241,6 +271,15 @@ public Builder maxHeaderListSize(long maxHeaderListSize) { return this; } + @Override + public Builder maxStreams(long maxStreams) { + if (maxStreams < 1) { + throw new IllegalArgumentException("maxStreams must be positive"); + } + this.maxStreams = Long.valueOf(maxStreams); + return this; + } + /* @Override public Builder pushEnabled(boolean pushEnabled) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 03407468c6..8936466f8b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -32,12 +32,14 @@ import io.netty.handler.codec.http.cookie.ServerCookieEncoder; import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler; import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2MultiplexHandler; import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -366,7 +368,7 @@ else if (p == HttpProtocol.H2C) { this._protocols = _protocols; } - Http2Settings http2Settings() { + static Http2Settings http2Settings(@Nullable Http2SettingsSpec http2Settings) { Http2Settings settings = Http2Settings.defaultSettings(); if (http2Settings != null) { @@ -506,7 +508,7 @@ static void configureH2Pipeline(ChannelPipeline p, boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, - Http2Settings http2Settings, + @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, @Nullable Duration idleTimeout, ConnectionObserver listener, @@ -521,11 +523,16 @@ static void configureH2Pipeline(ChannelPipeline p, Http2FrameCodecBuilder http2FrameCodecBuilder = Http2FrameCodecBuilder.forServer() .validateHeaders(validate) - .initialSettings(http2Settings); + .initialSettings(http2Settings(http2SettingsSpec)); - if (enableGracefulShutdown) { - // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + Long maxStreams = http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null; + if (enableGracefulShutdown || maxStreams != null) { + // 1. Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout // when disposeNow(timeout) is invoked + // 2. When 'maxStreams' is configured, the graceful shutdown is enabled. + // The graceful shutdown is configured with indefinite timeout because + // the response time is controlled by the user and might be different + // for the different requests. http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); } @@ -534,7 +541,11 @@ static void configureH2Pipeline(ChannelPipeline p, "reactor.netty.http.server.h2")); } - p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) + Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build(); + if (maxStreams != null) { + http2FrameCodec.connection().addListener(new H2ConnectionListener(p.channel(), maxStreams)); + } + p.addLast(NettyPipeline.HttpCodec, http2FrameCodec) .addLast(NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, @@ -562,7 +573,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, - Http2Settings http2Settings, + @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, @Nullable Duration idleTimeout, ConnectionObserver listener, @@ -579,10 +590,10 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider, - forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder, + forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); - ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader); + ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null); CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler( httpServerCodec, new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()), @@ -751,27 +762,33 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { final Http11OrH2CleartextCodec upgrader; final boolean addHttp2FrameCodec; final boolean removeMetricsHandler; + final Long maxStreams; /** * Used when full H2 preface is received */ - H2CleartextCodec(Http11OrH2CleartextCodec upgrader) { - this(upgrader, true, true); + H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams) { + this(upgrader, true, true, maxStreams); } /** * Used when upgrading from HTTP/1.1 to H2. When an upgrade happens {@link Http2FrameCodec} * is added by {@link Http2ServerUpgradeCodec} */ - H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler) { + H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler, + @Nullable Long maxStreams) { this.upgrader = upgrader; this.addHttp2FrameCodec = addHttp2FrameCodec; this.removeMetricsHandler = removeMetricsHandler; + this.maxStreams = maxStreams; } @Override public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.pipeline(); + if (maxStreams != null) { + upgrader.http2FrameCodec.connection().addListener(new H2ConnectionListener(ctx.channel(), maxStreams)); + } if (addHttp2FrameCodec) { pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec); } @@ -876,6 +893,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final ConnectionObserver listener; final BiFunction, ? super Connection, ? extends Mono> mapHandle; + final Long maxStreams; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; final ChannelOperations.OnSetup opsFactory; @@ -891,7 +909,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, - Http2Settings http2Settings, + @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, ConnectionObserver listener, @Nullable BiFunction, ? super Connection, ? extends Mono> mapHandle, @@ -910,11 +928,16 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer Http2FrameCodecBuilder http2FrameCodecBuilder = Http2FrameCodecBuilder.forServer() .validateHeaders(validate) - .initialSettings(http2Settings); + .initialSettings(http2Settings(http2SettingsSpec)); - if (enableGracefulShutdown) { - // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + this.maxStreams = http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null; + if (enableGracefulShutdown || maxStreams != null) { + // 1. Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout // when disposeNow(timeout) is invoked + // 2. When 'maxStreams' is configured, the graceful shutdown is enabled. + // The graceful shutdown is configured with indefinite timeout because + // the response time is controlled by the user and might be different + // for the different requests. http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); } @@ -948,7 +971,7 @@ protected void initChannel(Channel ch) { @Nullable public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false)); + return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false, maxStreams)); } else { return null; @@ -956,6 +979,29 @@ public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protoc } } + static final class H2ConnectionListener extends Http2ConnectionAdapter { + + final Channel channel; + final long maxStreams; + + long numStreams; + + H2ConnectionListener(Channel channel, long maxStreams) { + this.channel = channel; + this.maxStreams = maxStreams; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void onStreamActive(Http2Stream stream) { + assert channel.eventLoop().inEventLoop(); + if (++numStreams == maxStreams) { + //"FutureReturnValueIgnored" this is deliberate + channel.close(); + } + } + } + static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler { final boolean accessLogEnabled; @@ -967,7 +1013,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; - final Http2Settings http2Settings; + final Http2SettingsSpec http2SettingsSpec; final HttpMessageLogFactory httpMessageLogFactory; final Duration idleTimeout; final ConnectionObserver listener; @@ -990,7 +1036,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler this.enableGracefulShutdown = initializer.enableGracefulShutdown; this.formDecoderProvider = initializer.formDecoderProvider; this.forwardedHeaderHandler = initializer.forwardedHeaderHandler; - this.http2Settings = initializer.http2Settings; + this.http2SettingsSpec = initializer.http2SettingsSpec; this.httpMessageLogFactory = initializer.httpMessageLogFactory; this.idleTimeout = initializer.idleTimeout; this.listener = listener; @@ -1012,7 +1058,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, - enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, idleTimeout, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); return; } @@ -1044,7 +1090,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; - final Http2Settings http2Settings; + final Http2SettingsSpec http2SettingsSpec; final HttpMessageLogFactory httpMessageLogFactory; final Duration idleTimeout; final BiFunction, ? super Connection, ? extends Mono> @@ -1069,7 +1115,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig this.enableGracefulShutdown = config.channelGroup() != null; this.formDecoderProvider = config.formDecoderProvider; this.forwardedHeaderHandler = config.forwardedHeaderHandler; - this.http2Settings = config.http2Settings(); + this.http2SettingsSpec = config.http2Settings; this.httpMessageLogFactory = config.httpMessageLogFactory; this.idleTimeout = config.idleTimeout; this.mapHandle = config.mapHandle; @@ -1137,7 +1183,7 @@ else if ((protocols & h2) == h2) { enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, - http2Settings, + http2SettingsSpec, httpMessageLogFactory, idleTimeout, observer, @@ -1162,7 +1208,7 @@ else if ((protocols & h2) == h2) { enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, - http2Settings, + http2SettingsSpec, httpMessageLogFactory, idleTimeout, observer, @@ -1204,7 +1250,7 @@ else if ((protocols & h2c) == h2c) { enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, - http2Settings, + http2SettingsSpec, httpMessageLogFactory, idleTimeout, observer, diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java index 813259adc9..f5129a7220 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java @@ -40,6 +40,7 @@ void headerTableSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -59,6 +60,7 @@ void initialWindowSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -78,6 +80,7 @@ void maxConcurrentStreams() { assertThat(spec.maxConcurrentStreams()).isEqualTo(123); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -97,6 +100,7 @@ void maxFrameSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isEqualTo(16384); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -116,6 +120,7 @@ void maxHeaderListSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(123); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -126,6 +131,52 @@ void maxHeaderListSizeBadValues() { .withMessageContaining("Setting MAX_HEADER_LIST_SIZE is invalid: -1"); } + @Test + public void maxStreamsNoMaxConcurrentStreams() { + builder.maxStreams(123); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void maxStreamsWithMaxConcurrentStreams_1() { + builder.maxStreams(123).maxConcurrentStreams(456); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isEqualTo(123); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void maxStreamsWithMaxConcurrentStreams_2() { + builder.maxStreams(456).maxConcurrentStreams(123); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isEqualTo(456); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void maxStreamsBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxStreams(-1)) + .withMessageContaining("maxStreams must be positive"); + } + /* @Test public void pushEnabled() { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java index 5d6926b41e..d19614d5be 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java @@ -16,15 +16,21 @@ package reactor.netty.http; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import reactor.netty.BaseHttpTest; import reactor.netty.ByteBufFlux; @@ -51,6 +57,7 @@ import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.ConnectionObserver.State.CONFIGURED; /** * Holds HTTP/2 specific tests. @@ -626,4 +633,66 @@ private static void doTestPR2659_SchemeHttps(Predicate predicate) { .verify(Duration.ofSeconds(30)); } + @ParameterizedTest + @MethodSource("h2cCompatibleCombinations") + void testMaxStreamsH2C(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) { + ConnectionProvider provider = ConnectionProvider.create("testMaxStreamsH2C", 1); + try { + doTestMaxStreams(createServer().protocol(serverProtocols), + createClient(provider, () -> disposableServer.address()).protocol(clientProtocols)); + } + finally { + provider.disposeLater().block(Duration.ofSeconds(5)); + } + } + + @ParameterizedTest + @MethodSource("h2CompatibleCombinations") + void testMaxStreamsH2(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = + Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + ConnectionProvider provider = ConnectionProvider.create("testMaxStreamsH2", 1); + try { + doTestMaxStreams(createServer().protocol(serverProtocols).secure(spec -> spec.sslContext(serverCtx)), + createClient(provider, () -> disposableServer.address()).protocol(clientProtocols).secure(spec -> spec.sslContext(clientCtx))); + } + finally { + provider.disposeLater().block(Duration.ofSeconds(5)); + } + } + + private void doTestMaxStreams(HttpServer server, HttpClient client) { + Sinks.One goAwaySent = Sinks.one(); + disposableServer = + server.childObserve((conn, state) -> { + if (state == CONFIGURED) { + Http2FrameCodec http2FrameCodec = conn.channel().parent().pipeline().get(Http2FrameCodec.class); + Http2Connection.Listener goAwayFrameListener = Mockito.mock(Http2Connection.Listener.class); + Mockito.doAnswer(invocation -> { + goAwaySent.tryEmitValue("goAwaySent"); + return null; + }) + .when(goAwayFrameListener) + .onGoAwaySent(Mockito.anyInt(), Mockito.anyLong(), Mockito.any()); + http2FrameCodec.connection().addListener(goAwayFrameListener); + } + }) + .http2Settings(spec -> spec.maxStreams(2)) + .handle((req, res) -> res.sendString(Mono.just("doTestMaxStreams"))) + .bindNow(); + + Flux.range(0, 2) + .flatMap(i -> + client.get() + .uri("/") + .responseSingle((res, bytes) -> bytes.asString())) + .collectList() + .zipWith(goAwaySent.asMono()) + .as(StepVerifier::create) + .assertNext(t -> assertThat(t.getT1()).isNotNull().hasSize(2).allMatch("doTestMaxStreams"::equals)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } }