Skip to content

Commit

Permalink
Add limited support for Bidi streams over HTTP1
Browse files Browse the repository at this point in the history
This adds limited support for bidirectional streams over HTTP1
transports. It removes the explicit error response which disabled this
behavior based on the protocol stream type. Bidi streams may now be used
in half-duplex mode. This does not enable full-duplex streaming. All
requests must be sent before responses are received.

This removes the workarounds in the conformance test suite to fake a
HTTP2 request/response in order to allow for this behavior. In doing so
a deadlock on writing to a closed requests was fixed.

Signed-off-by: Edward McFarlane <emcfarlane@buf.build>
  • Loading branch information
emcfarlane committed Nov 8, 2024
1 parent 74a6754 commit b6793c6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 96 deletions.
128 changes: 51 additions & 77 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,16 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.Close())
})
}
testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, expectSuccess bool) { //nolint:thelper
testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, bidiSupported bool) { //nolint:thelper
t.Run("cumsum", func(t *testing.T) {
if !bidiSupported {
t.Skip("transport doesn't support bidi streaming")
}
send := []int64{3, 5, 1}
expect := []int64{3, 8, 9}
var got []int64
stream := client.CumSum(context.Background())
stream.RequestHeader().Set(clientHeader, headerValue)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
return
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand Down Expand Up @@ -249,11 +248,10 @@ func TestServer(t *testing.T) {
assert.Equal(t, stream.ResponseTrailer().Values(handlerTrailer), []string{trailerValue})
})
t.Run("cumsum_error", func(t *testing.T) {
stream := client.CumSum(context.Background())
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
return
if !bidiSupported {
t.Skip("transport doesn't support bidi streaming")
}
stream := client.CumSum(context.Background())
if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil {
assert.ErrorIs(t, err, io.EOF)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
Expand All @@ -265,12 +263,11 @@ func TestServer(t *testing.T) {
assert.True(t, connect.IsWireError(err))
})
t.Run("cumsum_empty_stream", func(t *testing.T) {
if !bidiSupported {
t.Skip("transport doesn't support bidi streaming")
}
stream := client.CumSum(context.Background())
stream.RequestHeader().Set(clientHeader, headerValue)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
return
}
// Deliberately closing with calling Send to test the behavior of Receive.
// This test case is based on the grpc interop tests.
assert.Nil(t, stream.CloseRequest())
Expand All @@ -281,14 +278,12 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.CloseResponse()) // clean-up the stream
})
t.Run("cumsum_cancel_after_first_response", func(t *testing.T) {
if !bidiSupported {
t.Skip("transport doesn't support bidi streaming")
}
ctx, cancel := context.WithCancel(context.Background())
stream := client.CumSum(ctx)
stream.RequestHeader().Set(clientHeader, headerValue)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
cancel()
return
}
var got []int64
expect := []int64{42}
if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil {
Expand All @@ -306,13 +301,11 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.CloseResponse())
})
t.Run("cumsum_cancel_before_send", func(t *testing.T) {
if !bidiSupported {
t.Skip("transport doesn't support bidi streaming")
}
ctx, cancel := context.WithCancel(context.Background())
stream := client.CumSum(ctx)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
cancel()
return
}
stream.RequestHeader().Set(clientHeader, headerValue)
assert.Nil(t, stream.Send(&pingv1.CumSumRequest{Number: 8}))
cancel()
Expand All @@ -324,6 +317,27 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
t.Run("cumsum_unsupported", func(t *testing.T) {
if bidiSupported {
t.Skip("transport supports bidi streaming")
}
stream := client.CumSum(context.Background())
stream.RequestHeader().Set(clientHeader, headerValue)
err := stream.Send(&pingv1.CumSumRequest{Number: 1})
assert.Nil(t, err)

response, err := stream.Receive()
assert.Nil(t, err)
assert.NotNil(t, response)
assert.Equal(t, response.GetSum(), 1)

// Stream must now error as HTTP1 doesn't support full-duplex.
err = stream.Send(&pingv1.CumSumRequest{Number: 2})
assert.ErrorIs(t, err, io.EOF)

assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
}
testErrors := func(t *testing.T, client pingv1connect.PingServiceClient) { //nolint:thelper
assertIsHTTPMiddlewareError := func(tb testing.TB, err error) {
Expand Down Expand Up @@ -946,35 +960,6 @@ func TestUnavailableIfHostInvalid(t *testing.T) {
assert.Equal(t, connect.CodeOf(err), connect.CodeUnavailable)
}

func TestBidiRequiresHTTP2(t *testing.T) {
t.Parallel()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/connect+proto")
_, err := io.WriteString(w, "hello world")
assert.Nil(t, err)
})
server := memhttptest.NewServer(t, handler)
client := pingv1connect.NewPingServiceClient(
&http.Client{Transport: server.TransportHTTP1()},
server.URL(),
)
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
assert.Nil(t, stream.CloseRequest())
_, err := stream.Receive()
assert.NotNil(t, err)
var connectErr *connect.Error
assert.True(t, errors.As(err, &connectErr))
assert.Equal(t, connectErr.Code(), connect.CodeUnimplemented)
assert.True(
t,
strings.HasSuffix(connectErr.Message(), ": bidi streams require at least HTTP/2"),
)
}

func TestCompressMinBytesClient(t *testing.T) {
t.Parallel()
assertContentType := func(tb testing.TB, text, expect string) {
Expand Down Expand Up @@ -2180,15 +2165,21 @@ func TestBidiOverHTTP1(t *testing.T) {
&http.Client{Transport: server.TransportHTTP1()},
server.URL(),
)
// Create a BIDI stream, after receiving a response the stream should be
// closed as HTTP1 doesn't support full-duplex.
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
_, err := stream.Receive()
assert.NotNil(t, err)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported")
err := stream.Send(&pingv1.CumSumRequest{Number: 1})
assert.Nil(t, err)

response, err := stream.Receive()
assert.Nil(t, err)
assert.NotNil(t, response)
assert.Equal(t, response.GetSum(), 1)

// Stream must now error as HTTP1 doesn't support full-duplex.
err = stream.Send(&pingv1.CumSumRequest{Number: 2})
assert.ErrorIs(t, err, io.EOF)

assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
}
Expand Down Expand Up @@ -2771,23 +2762,6 @@ func (p *pluggablePingServer) CumSum(
return p.cumSum(ctx, stream)
}

func failNoHTTP2(tb testing.TB, stream *connect.BidiStreamForClient[pingv1.CumSumRequest, pingv1.CumSumResponse]) {
tb.Helper()
if err := stream.Send(&pingv1.CumSumRequest{}); err != nil {
assert.ErrorIs(tb, err, io.EOF)
assert.Equal(tb, connect.CodeOf(err), connect.CodeUnknown)
}
assert.Nil(tb, stream.CloseRequest())
_, err := stream.Receive()
assert.NotNil(tb, err) // should be 505
assert.True(
tb,
strings.Contains(err.Error(), "HTTP status 505"),
assert.Sprintf("expected 505, got %v", err),
)
assert.Nil(tb, stream.CloseResponse())
}

func expectClientHeader(check bool, req connect.AnyRequest) error {
if !check {
return nil
Expand Down
13 changes: 3 additions & 10 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,16 +332,9 @@ func (d *duplexHTTPCall) makeRequest() {
_ = d.CloseWrite()
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.responseErr = errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
)
if response.ProtoMajor < 2 {
// HTTP/1.x doesn't support bidirectional streaming. We need to close the
// write side of the stream before we can read from the response body.
_ = d.CloseWrite()
}
}
Expand Down
9 changes: 0 additions & 9 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re
// EOF: the stream we construct later on already does that, and we only
// return early when dealing with misbehaving clients. In those cases, it's
// okay if we can't re-use the connection.
isBidi := (h.spec.StreamType & StreamTypeBidi) == StreamTypeBidi
if isBidi && request.ProtoMajor < 2 {
// Clients coded to expect full-duplex connections may hang if they've
// mistakenly negotiated HTTP/1.1. To unblock them, we must close the
// underlying TCP connection.
responseWriter.Header().Set("Connection", "close")
responseWriter.WriteHeader(http.StatusHTTPVersionNotSupported)
return
}

protocolHandlers := h.protocolHandlers[request.Method]
if len(protocolHandlers) == 0 {
Expand Down

0 comments on commit b6793c6

Please sign in to comment.