From bf66a1a7cc9c048d789f7703af489965e178f95d Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 9 Dec 2024 19:23:42 +0200 Subject: [PATCH] HTTP/3: Ensure HttpClient sends full request when the send function does not change NettyOutbound (#3536) --- .../java/reactor/netty/http/Http3Tests.java | 166 +++++++++++++++--- .../http/client/Http3ConnectionProvider.java | 2 - 2 files changed, 138 insertions(+), 30 deletions(-) diff --git a/reactor-netty-http/src/http3Test/java/reactor/netty/http/Http3Tests.java b/reactor-netty-http/src/http3Test/java/reactor/netty/http/Http3Tests.java index 1bb7b949d4..401c4c7da0 100644 --- a/reactor-netty-http/src/http3Test/java/reactor/netty/http/Http3Tests.java +++ b/reactor-netty-http/src/http3Test/java/reactor/netty/http/Http3Tests.java @@ -19,15 +19,21 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.ssl.SniCompletionEvent; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.incubator.codec.http3.Http3DataFrame; +import io.netty.incubator.codec.http3.Http3HeadersFrame; import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; import io.netty.incubator.codec.quic.QuicChannel; import org.junit.jupiter.api.AfterEach; @@ -40,6 +46,7 @@ import reactor.core.publisher.Signal; import reactor.core.scheduler.Schedulers; import reactor.netty.ByteBufFlux; +import reactor.netty.ByteBufMono; import reactor.netty.DisposableServer; import reactor.netty.LogTracker; import reactor.netty.NettyPipeline; @@ -58,6 +65,7 @@ import javax.net.ssl.SNIHostName; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.charset.Charset; import java.security.cert.CertificateException; import java.time.Duration; import java.util.ArrayList; @@ -66,6 +74,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; @@ -365,34 +374,6 @@ void testGetRequest() { .verify(Duration.ofSeconds(5)); } - @Test - void testHttpClientNoSecurityHttp3Fails() { - disposableServer = - createServer() - .handle((req, res) -> res.sendString(Mono.just("Hello"))) - .bindNow(); - - createClient(disposableServer.port()) - .noSSL() - .get() - .uri("/") - .responseContent() - .aggregate() - .asString() - .as(StepVerifier::create) - .verifyErrorMessage(HTTP3_WITHOUT_TLS_CLIENT); - } - - @Test - void testHttpServerNoSecurityHttp3Fails() { - createServer() - .noSSL() - .handle((req, res) -> res.sendString(Mono.just("Hello"))) - .bind() - .as(StepVerifier::create) - .verifyErrorMessage(HTTP3_WITHOUT_TLS_SERVER); - } - @Test void testHttp3ForMemoryLeaks() { disposableServer = @@ -428,6 +409,64 @@ void testHttp3ForMemoryLeaks() { System.gc(); } + @Test + void testHttpClientNoSecurityHttp3Fails() { + disposableServer = + createServer() + .handle((req, res) -> res.sendString(Mono.just("Hello"))) + .bindNow(); + + createClient(disposableServer.port()) + .noSSL() + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .verifyErrorMessage(HTTP3_WITHOUT_TLS_CLIENT); + } + + @Test + void testHttpServerNoSecurityHttp3Fails() { + createServer() + .noSSL() + .handle((req, res) -> res.sendString(Mono.just("Hello"))) + .bind() + .as(StepVerifier::create) + .verifyErrorMessage(HTTP3_WITHOUT_TLS_SERVER); + } + + @Test + void testIssue3524Flux() { + // sends the message and then last http content + testRequestBody(sender -> sender.send((req, out) -> out.sendString(Flux.just("te", "st"))), 3); + } + + @Test + void testIssue3524Mono() { + // sends "full" request + testRequestBody(sender -> sender.send((req, out) -> out.sendString(Mono.just("test"))), 1); + } + + @Test + void testIssue3524MonoEmpty() { + // sends "full" request + testRequestBody(sender -> sender.send((req, out) -> Mono.empty()), 0); + } + + @Test + void testIssue3524NoBody() { + // sends "full" request + testRequestBody(sender -> sender.send((req, out) -> out), 0); + } + + @Test + void testIssue3524Object() { + // sends "full" request + testRequestBody(sender -> sender.send((req, out) -> out.sendObject(Unpooled.wrappedBuffer("test".getBytes(Charset.defaultCharset())))), 1); + } + @Test void testMaxActiveStreamsCustomPool() throws Exception { ConnectionProvider provider = ConnectionProvider.create("testMaxActiveStreamsCustomPool", 1); @@ -604,6 +643,77 @@ void testMetrics() throws Exception { } } + @Test + void testMonoRequestBodySentAsFullRequest_Flux() { + // sends the message and then last http content + testRequestBody(sender -> sender.send(ByteBufFlux.fromString(Mono.just("test"))), 2); + } + + @Test + void testMonoRequestBodySentAsFullRequest_Mono() { + // sends "full" request + testRequestBody(sender -> sender.send(ByteBufMono.fromString(Mono.just("test"))), 1); + } + + @Test + void testMonoRequestBodySentAsFullRequest_MonoEmpty() { + // sends "full" request + testRequestBody(sender -> sender.send(Mono.empty()), 0); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void testRequestBody(Function> sendFunction, int expectedMsg) { + disposableServer = + createServer().handle((req, res) -> req.receive() + .then(res.send())) + .bindNow(Duration.ofSeconds(30)); + + AtomicInteger counterHeaders = new AtomicInteger(); + AtomicInteger counterData = new AtomicInteger(); + sendFunction.apply( + createClient(disposableServer.port()) + .port(disposableServer.port()) + .doOnRequest((req, conn) -> { + ChannelPipeline pipeline = conn.channel().pipeline(); + ChannelHandlerContext ctx = pipeline.context(NettyPipeline.LoggingHandler); + if (ctx != null) { + pipeline.addAfter(ctx.name(), "testRequestBody", + new ChannelOutboundHandlerAdapter() { + boolean done; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.channel().closeFuture().addListener(f -> done = true); + super.handlerAdded(ctx); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (!done) { + if (msg instanceof Http3HeadersFrame) { + counterHeaders.getAndIncrement(); + } + else if (msg instanceof Http3DataFrame) { + counterData.getAndIncrement(); + } + } + //"FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + } + }); + } + }) + .post() + .uri("/")) + .responseContent() + .aggregate() + .asString() + .block(Duration.ofSeconds(30)); + + assertThat(counterHeaders.get()).isEqualTo(1); + assertThat(counterData.get()).isEqualTo(expectedMsg); + } + @Test void testPostRequest() { doTestPostRequest(false); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http3ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http3ConnectionProvider.java index ace8a6dd6e..302a59951b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http3ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http3ConnectionProvider.java @@ -67,7 +67,6 @@ import static reactor.netty.ReactorNetty.format; import static reactor.netty.ReactorNetty.getChannelContext; import static reactor.netty.ReactorNetty.setChannelContext; -import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; /** * An HTTP/3 implementation for pooled {@link ConnectionProvider}. @@ -404,7 +403,6 @@ public void operationComplete(Future future) { ChannelOperations ops = ChannelOperations.get(ch); if (ops != null) { - obs.onStateChange(ops, STREAM_CONFIGURED); sink.success(ops); } }