From b6793c683db1c5c6b706bfe7a969e29f8174a3dc Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Fri, 8 Nov 2024 15:15:47 -0500 Subject: [PATCH 1/2] Add limited support for Bidi streams over HTTP1 This adds limited support for bidirectional streams over HTTP1 transports. It removes the explicit error response which disabled this behavior based on the protocol stream type. Bidi streams may now be used in half-duplex mode. This does not enable full-duplex streaming. All requests must be sent before responses are received. This removes the workarounds in the conformance test suite to fake a HTTP2 request/response in order to allow for this behavior. In doing so a deadlock on writing to a closed requests was fixed. Signed-off-by: Edward McFarlane --- connect_ext_test.go | 128 ++++++++++++++++++-------------------------- duplex_http_call.go | 13 ++--- handler.go | 9 ---- 3 files changed, 54 insertions(+), 96 deletions(-) diff --git a/connect_ext_test.go b/connect_ext_test.go index b93c5708..c68eb6cc 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -210,17 +210,16 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.Close()) }) } - testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, expectSuccess bool) { //nolint:thelper + testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, bidiSupported bool) { //nolint:thelper t.Run("cumsum", func(t *testing.T) { + if !bidiSupported { + t.Skip("transport doesn't support bidi streaming") + } send := []int64{3, 5, 1} expect := []int64{3, 8, 9} var got []int64 stream := client.CumSum(context.Background()) stream.RequestHeader().Set(clientHeader, headerValue) - if !expectSuccess { // server doesn't support HTTP/2 - failNoHTTP2(t, stream) - return - } var wg sync.WaitGroup wg.Add(2) go func() { @@ -249,11 +248,10 @@ func TestServer(t *testing.T) { assert.Equal(t, stream.ResponseTrailer().Values(handlerTrailer), []string{trailerValue}) }) t.Run("cumsum_error", func(t *testing.T) { - stream := client.CumSum(context.Background()) - if !expectSuccess { // server doesn't support HTTP/2 - failNoHTTP2(t, stream) - return + if !bidiSupported { + t.Skip("transport doesn't support bidi streaming") } + stream := client.CumSum(context.Background()) if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil { assert.ErrorIs(t, err, io.EOF) assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) @@ -265,12 +263,11 @@ func TestServer(t *testing.T) { assert.True(t, connect.IsWireError(err)) }) t.Run("cumsum_empty_stream", func(t *testing.T) { + if !bidiSupported { + t.Skip("transport doesn't support bidi streaming") + } stream := client.CumSum(context.Background()) stream.RequestHeader().Set(clientHeader, headerValue) - if !expectSuccess { // server doesn't support HTTP/2 - failNoHTTP2(t, stream) - return - } // Deliberately closing with calling Send to test the behavior of Receive. // This test case is based on the grpc interop tests. assert.Nil(t, stream.CloseRequest()) @@ -281,14 +278,12 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.CloseResponse()) // clean-up the stream }) t.Run("cumsum_cancel_after_first_response", func(t *testing.T) { + if !bidiSupported { + t.Skip("transport doesn't support bidi streaming") + } ctx, cancel := context.WithCancel(context.Background()) stream := client.CumSum(ctx) stream.RequestHeader().Set(clientHeader, headerValue) - if !expectSuccess { // server doesn't support HTTP/2 - failNoHTTP2(t, stream) - cancel() - return - } var got []int64 expect := []int64{42} if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil { @@ -306,13 +301,11 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.CloseResponse()) }) t.Run("cumsum_cancel_before_send", func(t *testing.T) { + if !bidiSupported { + t.Skip("transport doesn't support bidi streaming") + } ctx, cancel := context.WithCancel(context.Background()) stream := client.CumSum(ctx) - if !expectSuccess { // server doesn't support HTTP/2 - failNoHTTP2(t, stream) - cancel() - return - } stream.RequestHeader().Set(clientHeader, headerValue) assert.Nil(t, stream.Send(&pingv1.CumSumRequest{Number: 8})) cancel() @@ -324,6 +317,27 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.CloseRequest()) assert.Nil(t, stream.CloseResponse()) }) + t.Run("cumsum_unsupported", func(t *testing.T) { + if bidiSupported { + t.Skip("transport supports bidi streaming") + } + stream := client.CumSum(context.Background()) + stream.RequestHeader().Set(clientHeader, headerValue) + err := stream.Send(&pingv1.CumSumRequest{Number: 1}) + assert.Nil(t, err) + + response, err := stream.Receive() + assert.Nil(t, err) + assert.NotNil(t, response) + assert.Equal(t, response.GetSum(), 1) + + // Stream must now error as HTTP1 doesn't support full-duplex. + err = stream.Send(&pingv1.CumSumRequest{Number: 2}) + assert.ErrorIs(t, err, io.EOF) + + assert.Nil(t, stream.CloseRequest()) + assert.Nil(t, stream.CloseResponse()) + }) } testErrors := func(t *testing.T, client pingv1connect.PingServiceClient) { //nolint:thelper assertIsHTTPMiddlewareError := func(tb testing.TB, err error) { @@ -946,35 +960,6 @@ func TestUnavailableIfHostInvalid(t *testing.T) { assert.Equal(t, connect.CodeOf(err), connect.CodeUnavailable) } -func TestBidiRequiresHTTP2(t *testing.T) { - t.Parallel() - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/connect+proto") - _, err := io.WriteString(w, "hello world") - assert.Nil(t, err) - }) - server := memhttptest.NewServer(t, handler) - client := pingv1connect.NewPingServiceClient( - &http.Client{Transport: server.TransportHTTP1()}, - server.URL(), - ) - stream := client.CumSum(context.Background()) - // Stream creates an async request, can error on Send or Receive. - if err := stream.Send(&pingv1.CumSumRequest{}); err != nil { - assert.ErrorIs(t, err, io.EOF) - } - assert.Nil(t, stream.CloseRequest()) - _, err := stream.Receive() - assert.NotNil(t, err) - var connectErr *connect.Error - assert.True(t, errors.As(err, &connectErr)) - assert.Equal(t, connectErr.Code(), connect.CodeUnimplemented) - assert.True( - t, - strings.HasSuffix(connectErr.Message(), ": bidi streams require at least HTTP/2"), - ) -} - func TestCompressMinBytesClient(t *testing.T) { t.Parallel() assertContentType := func(tb testing.TB, text, expect string) { @@ -2180,15 +2165,21 @@ func TestBidiOverHTTP1(t *testing.T) { &http.Client{Transport: server.TransportHTTP1()}, server.URL(), ) + // Create a BIDI stream, after receiving a response the stream should be + // closed as HTTP1 doesn't support full-duplex. stream := client.CumSum(context.Background()) - // Stream creates an async request, can error on Send or Receive. - if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil { - assert.ErrorIs(t, err, io.EOF) - } - _, err := stream.Receive() - assert.NotNil(t, err) - assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) - assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported") + err := stream.Send(&pingv1.CumSumRequest{Number: 1}) + assert.Nil(t, err) + + response, err := stream.Receive() + assert.Nil(t, err) + assert.NotNil(t, response) + assert.Equal(t, response.GetSum(), 1) + + // Stream must now error as HTTP1 doesn't support full-duplex. + err = stream.Send(&pingv1.CumSumRequest{Number: 2}) + assert.ErrorIs(t, err, io.EOF) + assert.Nil(t, stream.CloseRequest()) assert.Nil(t, stream.CloseResponse()) } @@ -2771,23 +2762,6 @@ func (p *pluggablePingServer) CumSum( return p.cumSum(ctx, stream) } -func failNoHTTP2(tb testing.TB, stream *connect.BidiStreamForClient[pingv1.CumSumRequest, pingv1.CumSumResponse]) { - tb.Helper() - if err := stream.Send(&pingv1.CumSumRequest{}); err != nil { - assert.ErrorIs(tb, err, io.EOF) - assert.Equal(tb, connect.CodeOf(err), connect.CodeUnknown) - } - assert.Nil(tb, stream.CloseRequest()) - _, err := stream.Receive() - assert.NotNil(tb, err) // should be 505 - assert.True( - tb, - strings.Contains(err.Error(), "HTTP status 505"), - assert.Sprintf("expected 505, got %v", err), - ) - assert.Nil(tb, stream.CloseResponse()) -} - func expectClientHeader(check bool, req connect.AnyRequest) error { if !check { return nil diff --git a/duplex_http_call.go b/duplex_http_call.go index 38d14ade..8bdab277 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -332,16 +332,9 @@ func (d *duplexHTTPCall) makeRequest() { _ = d.CloseWrite() return } - if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { - // If we somehow dialed an HTTP/1.x server, fail with an explicit message - // rather than returning a more cryptic error later on. - d.responseErr = errorf( - CodeUnimplemented, - "response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2", - d.request.URL, - response.ProtoMajor, - response.ProtoMinor, - ) + if response.ProtoMajor < 2 { + // HTTP/1.x doesn't support bidirectional streaming. We need to close the + // write side of the stream before we can read from the response body. _ = d.CloseWrite() } } diff --git a/handler.go b/handler.go index 9f95627b..ccf4bca5 100644 --- a/handler.go +++ b/handler.go @@ -161,15 +161,6 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re // EOF: the stream we construct later on already does that, and we only // return early when dealing with misbehaving clients. In those cases, it's // okay if we can't re-use the connection. - isBidi := (h.spec.StreamType & StreamTypeBidi) == StreamTypeBidi - if isBidi && request.ProtoMajor < 2 { - // Clients coded to expect full-duplex connections may hang if they've - // mistakenly negotiated HTTP/1.1. To unblock them, we must close the - // underlying TCP connection. - responseWriter.Header().Set("Connection", "close") - responseWriter.WriteHeader(http.StatusHTTPVersionNotSupported) - return - } protocolHandlers := h.protocolHandlers[request.Method] if len(protocolHandlers) == 0 { From fbe3ccb0f6c92a7d0583675eb56eb7a649f446df Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Fri, 8 Nov 2024 15:33:12 -0500 Subject: [PATCH 2/2] Clarify testcase var Signed-off-by: Edward McFarlane --- connect_ext_test.go | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/connect_ext_test.go b/connect_ext_test.go index c68eb6cc..35b69868 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -210,10 +210,10 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.Close()) }) } - testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, bidiSupported bool) { //nolint:thelper + testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, fullDuplex bool) { //nolint:thelper t.Run("cumsum", func(t *testing.T) { - if !bidiSupported { - t.Skip("transport doesn't support bidi streaming") + if !fullDuplex { + t.Skip("transport doesn't support full-duplex streaming") } send := []int64{3, 5, 1} expect := []int64{3, 8, 9} @@ -248,8 +248,8 @@ func TestServer(t *testing.T) { assert.Equal(t, stream.ResponseTrailer().Values(handlerTrailer), []string{trailerValue}) }) t.Run("cumsum_error", func(t *testing.T) { - if !bidiSupported { - t.Skip("transport doesn't support bidi streaming") + if !fullDuplex { + t.Skip("transport doesn't support full-duplex streaming") } stream := client.CumSum(context.Background()) if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil { @@ -263,8 +263,8 @@ func TestServer(t *testing.T) { assert.True(t, connect.IsWireError(err)) }) t.Run("cumsum_empty_stream", func(t *testing.T) { - if !bidiSupported { - t.Skip("transport doesn't support bidi streaming") + if !fullDuplex { + t.Skip("transport doesn't support full-duplex streaming") } stream := client.CumSum(context.Background()) stream.RequestHeader().Set(clientHeader, headerValue) @@ -278,8 +278,8 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.CloseResponse()) // clean-up the stream }) t.Run("cumsum_cancel_after_first_response", func(t *testing.T) { - if !bidiSupported { - t.Skip("transport doesn't support bidi streaming") + if !fullDuplex { + t.Skip("transport doesn't support full-duplex streaming") } ctx, cancel := context.WithCancel(context.Background()) stream := client.CumSum(ctx) @@ -301,8 +301,8 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.CloseResponse()) }) t.Run("cumsum_cancel_before_send", func(t *testing.T) { - if !bidiSupported { - t.Skip("transport doesn't support bidi streaming") + if !fullDuplex { + t.Skip("transport doesn't support full-duplex streaming") } ctx, cancel := context.WithCancel(context.Background()) stream := client.CumSum(ctx) @@ -317,9 +317,11 @@ func TestServer(t *testing.T) { assert.Nil(t, stream.CloseRequest()) assert.Nil(t, stream.CloseResponse()) }) - t.Run("cumsum_unsupported", func(t *testing.T) { - if bidiSupported { - t.Skip("transport supports bidi streaming") + t.Run("cumsum_half_duplex", func(t *testing.T) { + // This test is for HTTP1, which doesn't support full-duplex streaming. + // We expect the stream to error after the first response. + if fullDuplex { + t.Skip("transport supports full-duplex streaming") } stream := client.CumSum(context.Background()) stream.RequestHeader().Set(clientHeader, headerValue) @@ -387,14 +389,14 @@ func TestServer(t *testing.T) { assertIsHTTPMiddlewareError(t, stream.Err()) }) } - testMatrix := func(t *testing.T, client *http.Client, url string, bidi bool) { //nolint:thelper + testMatrix := func(t *testing.T, client *http.Client, url string, fullDuplex bool) { //nolint:thelper run := func(t *testing.T, opts ...connect.ClientOption) { t.Helper() client := pingv1connect.NewPingServiceClient(client, url, opts...) testPing(t, client) testSum(t, client) testCountUp(t, client) - testCumSum(t, client, bidi) + testCumSum(t, client, fullDuplex) testErrors(t, client) } t.Run("connect", func(t *testing.T) { @@ -500,13 +502,13 @@ func TestServer(t *testing.T) { t.Parallel() server := memhttptest.NewServer(t, mux) client := &http.Client{Transport: server.TransportHTTP1()} - testMatrix(t, client, server.URL(), false /* bidi */) + testMatrix(t, client, server.URL(), false /* full-duplex */) }) t.Run("http2", func(t *testing.T) { t.Parallel() server := memhttptest.NewServer(t, mux) client := server.Client() - testMatrix(t, client, server.URL(), true /* bidi */) + testMatrix(t, client, server.URL(), true /* full-duplex */) }) }