diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/Http2SettingsSpec.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/Http2SettingsSpec.java index ce61495ffe..29525d3098 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/Http2SettingsSpec.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/Http2SettingsSpec.java @@ -78,6 +78,14 @@ public interface Builder { */ Builder maxHeaderListSize(long maxHeaderListSize); + /** + * The connection is marked for closing once the number of all-time streams reaches {@code maxStreams}. + * + * @return {@code this} + * @since 1.0.33 + */ + Builder maxStreams(long maxStreams); + /** * Sets the {@code SETTINGS_ENABLE_PUSH} value. * @@ -147,6 +155,17 @@ public Long maxHeaderListSize() { return maxHeaderListSize; } + /** + * Returns the configured {@code maxStreams} value or null. + * + * @return the configured {@code maxStreams} value or null + * @since 1.0.33 + */ + @Nullable + public Long maxStreams() { + return maxStreams; + } + /** * Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null. * @@ -170,6 +189,7 @@ public boolean equals(Object o) { Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) && Objects.equals(maxFrameSize, that.maxFrameSize) && maxHeaderListSize.equals(that.maxHeaderListSize) && + Objects.equals(maxStreams, that.maxStreams) && Objects.equals(pushEnabled, that.pushEnabled); } @@ -181,6 +201,7 @@ public int hashCode() { result = 31 * result + Long.hashCode(maxConcurrentStreams); result = 31 * result + maxFrameSize; result = 31 * result + Long.hashCode(maxHeaderListSize); + result = 31 * result + Long.hashCode(maxStreams); result = 31 * result + Boolean.hashCode(pushEnabled); return result; } @@ -190,19 +211,28 @@ public int hashCode() { final Long maxConcurrentStreams; final Integer maxFrameSize; final Long maxHeaderListSize; + final Long maxStreams; final Boolean pushEnabled; Http2SettingsSpec(Build build) { Http2Settings settings = build.http2Settings; headerTableSize = settings.headerTableSize(); initialWindowSize = settings.initialWindowSize(); - maxConcurrentStreams = settings.maxConcurrentStreams(); + if (settings.maxConcurrentStreams() != null) { + maxConcurrentStreams = build.maxStreams != null ? + Math.min(settings.maxConcurrentStreams(), build.maxStreams) : settings.maxConcurrentStreams(); + } + else { + maxConcurrentStreams = build.maxStreams; + } maxFrameSize = settings.maxFrameSize(); maxHeaderListSize = settings.maxHeaderListSize(); + maxStreams = build.maxStreams; pushEnabled = settings.pushEnabled(); } static final class Build implements Builder { + Long maxStreams; final Http2Settings http2Settings = Http2Settings.defaultSettings(); @Override @@ -240,6 +270,15 @@ public Builder maxHeaderListSize(long maxHeaderListSize) { return this; } + @Override + public Builder maxStreams(long maxStreams) { + if (maxStreams < 1) { + throw new IllegalArgumentException("maxStreams must be positive"); + } + this.maxStreams = Long.valueOf(maxStreams); + return this; + } + /* @Override public Builder pushEnabled(boolean pushEnabled) { diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java index 5fe8faac87..cf9a433bdb 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java @@ -30,12 +30,14 @@ import io.netty5.handler.codec.http.HttpServerUpgradeHandler; import io.netty5.handler.codec.http2.CleartextHttp2ServerUpgradeHandler; import io.netty5.handler.codec.http2.Http2CodecUtil; +import io.netty5.handler.codec.http2.Http2ConnectionAdapter; import io.netty5.handler.codec.http2.Http2FrameCodec; import io.netty5.handler.codec.http2.Http2FrameCodecBuilder; import io.netty5.handler.codec.http2.Http2FrameLogger; import io.netty5.handler.codec.http2.Http2MultiplexHandler; import io.netty5.handler.codec.http2.Http2ServerUpgradeCodec; import io.netty5.handler.codec.http2.Http2Settings; +import io.netty5.handler.codec.http2.Http2Stream; import io.netty5.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty5.handler.logging.LogLevel; import io.netty5.handler.logging.LoggingHandler; @@ -341,7 +343,7 @@ else if (p == HttpProtocol.H2C) { this._protocols = _protocols; } - Http2Settings http2Settings() { + static Http2Settings http2Settings(@Nullable Http2SettingsSpec http2Settings) { Http2Settings settings = Http2Settings.defaultSettings(); if (http2Settings != null) { @@ -489,7 +491,7 @@ static void configureH2Pipeline(ChannelPipeline p, boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, - Http2Settings http2Settings, + @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, @Nullable Duration idleTimeout, ConnectionObserver listener, @@ -504,11 +506,16 @@ static void configureH2Pipeline(ChannelPipeline p, Http2FrameCodecBuilder http2FrameCodecBuilder = Http2FrameCodecBuilder.forServer() .validateHeaders(validate) - .initialSettings(http2Settings); + .initialSettings(http2Settings(http2SettingsSpec)); - if (enableGracefulShutdown) { - // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + Long maxStreams = http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null; + if (enableGracefulShutdown || maxStreams != null) { + // 1. Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout // when disposeNow(timeout) is invoked + // 2. When 'maxStreams' is configured, the graceful shutdown is enabled. + // The graceful shutdown is configured with indefinite timeout because + // the response time is controlled by the user and might be different + // for the different requests. http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); } @@ -517,7 +524,11 @@ static void configureH2Pipeline(ChannelPipeline p, "reactor.netty5.http.server.h2")); } - p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) + Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build(); + if (maxStreams != null) { + http2FrameCodec.connection().addListener(new H2ConnectionListener(p.channel(), maxStreams)); + } + p.addLast(NettyPipeline.HttpCodec, http2FrameCodec) .addLast(NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, compressPredicate, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, @@ -543,7 +554,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, - Http2Settings http2Settings, + @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, @Nullable Duration idleTimeout, ConnectionObserver listener, @@ -560,10 +571,10 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider, - forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder, + forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); - ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader); + ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null); CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler( httpServerCodec, new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()), @@ -742,29 +753,39 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { final Http11OrH2CleartextCodec upgrader; final boolean addHttp2FrameCodec; final boolean removeMetricsHandler; + final Long maxStreams; /** * Used when full H2 preface is received */ - H2CleartextCodec(Http11OrH2CleartextCodec upgrader) { - this(upgrader, true, true); + H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams) { + this(upgrader, true, true, maxStreams); } /** * Used when upgrading from HTTP/1.1 to H2. When an upgrade happens {@link Http2FrameCodec} * is added by {@link Http2ServerUpgradeCodec} */ - H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler) { + H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler, + @Nullable Long maxStreams) { this.upgrader = upgrader; this.addHttp2FrameCodec = addHttp2FrameCodec; this.removeMetricsHandler = removeMetricsHandler; + this.maxStreams = maxStreams; } @Override public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.pipeline(); if (addHttp2FrameCodec) { - pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodecBuilder.build()); + Http2FrameCodec http2FrameCodec = upgrader.http2FrameCodecBuilder.build(); + if (maxStreams != null) { + http2FrameCodec.connection().addListener(new H2ConnectionListener(ctx.channel(), maxStreams)); + } + pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec); + } + else if (maxStreams != null) { + pipeline.get(Http2FrameCodec.class).connection().addListener(new H2ConnectionListener(ctx.channel(), maxStreams)); } // Add this handler at the end of the pipeline as it does not forward all channelRead events @@ -875,6 +896,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final ConnectionObserver listener; final BiFunction, ? super Connection, ? extends Mono> mapHandle; + final Long maxStreams; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; final ChannelOperations.OnSetup opsFactory; @@ -888,7 +910,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, - Http2Settings http2Settings, + @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, ConnectionObserver listener, @Nullable BiFunction, ? super Connection, ? extends Mono> mapHandle, @@ -905,11 +927,16 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.http2FrameCodecBuilder = Http2FrameCodecBuilder.forServer() .validateHeaders(validate) - .initialSettings(http2Settings); + .initialSettings(http2Settings(http2SettingsSpec)); - if (enableGracefulShutdown) { - // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + this.maxStreams = http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null; + if (enableGracefulShutdown || maxStreams != null) { + // 1. Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout // when disposeNow(timeout) is invoked + // 2. When 'maxStreams' is configured, the graceful shutdown is enabled. + // The graceful shutdown is configured with indefinite timeout because + // the response time is controlled by the user and might be different + // for the different requests. http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); } @@ -942,7 +969,7 @@ protected void initChannel(Channel ch) { @Nullable public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false)); + return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false, maxStreams)); } else { return null; @@ -950,6 +977,29 @@ public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protoc } } + static final class H2ConnectionListener extends Http2ConnectionAdapter { + + final Channel channel; + final long maxStreams; + + long numStreams; + + H2ConnectionListener(Channel channel, long maxStreams) { + this.channel = channel; + this.maxStreams = maxStreams; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void onStreamActive(Http2Stream stream) { + assert channel.executor().inEventLoop(); + if (++numStreams == maxStreams) { + //"FutureReturnValueIgnored" this is deliberate + channel.close(); + } + } + } + static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler { final boolean accessLogEnabled; @@ -959,7 +1009,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; - final Http2Settings http2Settings; + final Http2SettingsSpec http2SettingsSpec; final HttpMessageLogFactory httpMessageLogFactory; final Duration idleTimeout; final ConnectionObserver listener; @@ -980,7 +1030,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler this.enableGracefulShutdown = initializer.enableGracefulShutdown; this.formDecoderProvider = initializer.formDecoderProvider; this.forwardedHeaderHandler = initializer.forwardedHeaderHandler; - this.http2Settings = initializer.http2Settings; + this.http2SettingsSpec = initializer.http2SettingsSpec; this.httpMessageLogFactory = initializer.httpMessageLogFactory; this.idleTimeout = initializer.idleTimeout; this.listener = listener; @@ -1002,7 +1052,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, - enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, listener, mapHandle, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, idleTimeout, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); return; } @@ -1032,7 +1082,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; - final Http2Settings http2Settings; + final Http2SettingsSpec http2SettingsSpec; final HttpMessageLogFactory httpMessageLogFactory; final Duration idleTimeout; final BiFunction, ? super Connection, ? extends Mono> @@ -1055,7 +1105,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig this.enableGracefulShutdown = config.channelGroup() != null; this.formDecoderProvider = config.formDecoderProvider; this.forwardedHeaderHandler = config.forwardedHeaderHandler; - this.http2Settings = config.http2Settings(); + this.http2SettingsSpec = config.http2Settings; this.httpMessageLogFactory = config.httpMessageLogFactory; this.idleTimeout = config.idleTimeout; this.mapHandle = config.mapHandle; @@ -1117,7 +1167,7 @@ else if ((protocols & h2) == h2) { enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, - http2Settings, + http2SettingsSpec, httpMessageLogFactory, idleTimeout, observer, @@ -1140,7 +1190,7 @@ else if ((protocols & h2) == h2) { enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, - http2Settings, + http2SettingsSpec, httpMessageLogFactory, idleTimeout, observer, @@ -1178,7 +1228,7 @@ else if ((protocols & h2c) == h2c) { enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, - http2Settings, + http2SettingsSpec, httpMessageLogFactory, idleTimeout, observer, diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2SettingsSpecTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2SettingsSpecTests.java index c6c64863c3..8e588ff1f8 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2SettingsSpecTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2SettingsSpecTests.java @@ -40,6 +40,7 @@ void headerTableSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -59,6 +60,7 @@ void initialWindowSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -78,6 +80,7 @@ void maxConcurrentStreams() { assertThat(spec.maxConcurrentStreams()).isEqualTo(123); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -97,6 +100,7 @@ void maxFrameSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isEqualTo(16384); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -116,6 +120,7 @@ void maxHeaderListSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(123); + assertThat(spec.maxStreams()).isNull(); assertThat(spec.pushEnabled()).isNull(); } @@ -126,6 +131,52 @@ void maxHeaderListSizeBadValues() { .withMessageContaining("Setting MAX_HEADER_LIST_SIZE is invalid: -1"); } + @Test + public void maxStreamsNoMaxConcurrentStreams() { + builder.maxStreams(123); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void maxStreamsWithMaxConcurrentStreams_1() { + builder.maxStreams(123).maxConcurrentStreams(456); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isEqualTo(123); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void maxStreamsWithMaxConcurrentStreams_2() { + builder.maxStreams(456).maxConcurrentStreams(123); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isEqualTo(123); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.maxStreams()).isEqualTo(456); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + public void maxStreamsBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxStreams(-1)) + .withMessageContaining("maxStreams must be positive"); + } + /* @Test public void pushEnabled() { diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2Tests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2Tests.java index 78af05bf46..5d0b7960c1 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2Tests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/Http2Tests.java @@ -16,15 +16,21 @@ package reactor.netty5.http; import io.netty5.buffer.Buffer; +import io.netty5.handler.codec.http2.Http2Connection; +import io.netty5.handler.codec.http2.Http2FrameCodec; import io.netty5.handler.ssl.util.InsecureTrustManagerFactory; import io.netty5.handler.ssl.util.SelfSignedCertificate; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import reactor.netty5.BaseHttpTest; import reactor.netty5.BufferFlux; @@ -51,6 +57,7 @@ import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty5.ConnectionObserver.State.CONFIGURED; /** * Holds HTTP/2 specific tests. @@ -626,4 +633,66 @@ private static void doTestPR2659_SchemeHttps(Predicate predicate) .verify(Duration.ofSeconds(30)); } + @ParameterizedTest + @MethodSource("h2cCompatibleCombinations") + void testMaxStreamsH2C(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) { + ConnectionProvider provider = ConnectionProvider.create("testMaxStreamsH2C", 1); + try { + doTestMaxStreams(createServer().protocol(serverProtocols), + createClient(provider, () -> disposableServer.address()).protocol(clientProtocols)); + } + finally { + provider.disposeLater().block(Duration.ofSeconds(5)); + } + } + + @ParameterizedTest + @MethodSource("h2CompatibleCombinations") + void testMaxStreamsH2(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = + Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + ConnectionProvider provider = ConnectionProvider.create("testMaxStreamsH2", 1); + try { + doTestMaxStreams(createServer().protocol(serverProtocols).secure(spec -> spec.sslContext(serverCtx)), + createClient(provider, () -> disposableServer.address()).protocol(clientProtocols).secure(spec -> spec.sslContext(clientCtx))); + } + finally { + provider.disposeLater().block(Duration.ofSeconds(5)); + } + } + + private void doTestMaxStreams(HttpServer server, HttpClient client) { + Sinks.One goAwaySent = Sinks.one(); + disposableServer = + server.childObserve((conn, state) -> { + if (state == CONFIGURED) { + Http2FrameCodec http2FrameCodec = conn.channel().parent().pipeline().get(Http2FrameCodec.class); + Http2Connection.Listener goAwayFrameListener = Mockito.mock(Http2Connection.Listener.class); + Mockito.doAnswer(invocation -> { + goAwaySent.tryEmitValue("goAwaySent"); + return null; + }) + .when(goAwayFrameListener) + .onGoAwaySent(Mockito.anyInt(), Mockito.anyLong(), Mockito.any()); + http2FrameCodec.connection().addListener(goAwayFrameListener); + } + }) + .http2Settings(spec -> spec.maxStreams(2)) + .handle((req, res) -> res.sendString(Mono.just("doTestMaxStreams"))) + .bindNow(); + + Flux.range(0, 2) + .flatMap(i -> + client.get() + .uri("/") + .responseSingle((res, bytes) -> bytes.asString())) + .collectList() + .zipWith(goAwaySent.asMono()) + .as(StepVerifier::create) + .assertNext(t -> assertThat(t.getT1()).isNotNull().hasSize(2).allMatch("doTestMaxStreams"::equals)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } }