Skip to content

Commit

Permalink
Implement unary HTTP calls with retry support
Browse files Browse the repository at this point in the history
Changes the internal duplexHTTPCall to switch on non streaming client
requests to block and wait for the response. This avoids the need to
convert the reader to a writer with io.Pipe and the go routine to run
asynchronously. On unary requests we are now able to set the
`Content-Length` header and the `GetBody` function for retries.

To safely reuse the payload buffer a new type `payloadCloser` is added
to implement the required HTTP body semantics. On receiving a response
the request may still be read up and until the response is body is
closed. To ensure the request body is safe to releasee we wait for a
complete read or close on the request body. Retries may read, close or
rewind the body multiple times before a response is returned.
  • Loading branch information
emcfarlane committed Dec 9, 2023
1 parent 52abcce commit f85fce8
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 53 deletions.
16 changes: 15 additions & 1 deletion connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ func TestServer(t *testing.T) {
pingServer{checkMetadata: true},
)
errorWriter := connect.NewErrorWriter()
// Add some net/http middleware to the ping service so we can also exercise ErrorWriter.
// Add net/http middleware to the ping service to evaluate HTTP state.
mux.Handle(pingRoute, http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
// Exercise ErrorWriter for HTTP middleware errors.
if request.Header.Get(clientMiddlewareErrorHeader) != "" {
defer request.Body.Close()
if _, err := io.Copy(io.Discard, request.Body); err != nil {
Expand All @@ -449,6 +450,19 @@ func TestServer(t *testing.T) {
}
return
}
// Check Content-Length is set correctly.
switch request.URL.Path {
case pingv1connect.PingServicePingProcedure,
pingv1connect.PingServiceFailProcedure,
pingv1connect.PingServiceCountUpProcedure:
if request.ContentLength < 0 {
t.Errorf("%s: expected Content-Length >= 0, got %d", request.URL.Path, request.ContentLength)
}
default:
if request.ContentLength > 0 {
t.Errorf("%s: expected Content-Length -1 or 0, got %d", request.URL.Path, request.ContentLength)
}
}
pingHandler.ServeHTTP(response, request)
}))

Expand Down
222 changes: 172 additions & 50 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net/http"
"net/url"
"sync"
"sync/atomic"
)

Expand All @@ -36,10 +37,8 @@ type duplexHTTPCall struct {
onRequestSend func(*http.Request)
validateResponse func(*http.Response) *Error

// We'll use a pipe as the request body. We hand the read side of the pipe to
// net/http, and we write to the write side (naturally). The two ends are
// safe to use concurrently.
requestBodyReader *io.PipeReader
// io.Pipe is used to implement the request body for client streaming calls.
// If the request is unary, requestBodyWriter is nil.
requestBodyWriter *io.PipeWriter

// requestSent ensures we only send the request once.
Expand All @@ -65,7 +64,6 @@ func newDuplexHTTPCall(
// Request. This ensures if a transport out of our control wants
// to mutate the req.URL, we don't feel the effects of it.
url = cloneURL(url)
pipeReader, pipeWriter := io.Pipe()

// This is mirroring what http.NewRequestContext did, but
// using an already parsed url.URL object, rather than a string
Expand All @@ -74,30 +72,40 @@ func newDuplexHTTPCall(
// NewRequestContext and doesn't effect the actual version
// being transmitted.
request := (&http.Request{
Method: http.MethodPost,
URL: url,
Header: header,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: pipeReader,
Host: url.Host,
Method: http.MethodPost,
URL: url,
Header: header,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: http.NoBody,
ContentLength: 0,
Host: url.Host,
}).WithContext(ctx)
return &duplexHTTPCall{
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
requestBodyReader: pipeReader,
requestBodyWriter: pipeWriter,
request: request,
responseReady: make(chan struct{}),
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
request: request,
responseReady: make(chan struct{}),
}
}

// Send sends a message to the server.
func (d *duplexHTTPCall) Send(payload messsagePayload) (int64, error) {
isFirst := d.ensureRequestMade()
// Before we send any data, check if the context has been canceled.
func (d *duplexHTTPCall) Send(payload messagePayload) (int64, error) {
if d.streamType&StreamTypeClient == 0 {
return d.sendUnary(payload)
}
isFirst := d.requestSent.CompareAndSwap(false, true)
if isFirst {
// This is the first time we're sending a message to the server.
// We need to send the request headers and start the request.
pipeReader, pipeWriter := io.Pipe()
d.requestBodyWriter = pipeWriter
d.request.Body = pipeReader
d.request.ContentLength = -1
go d.makeRequest() // concurrent request
}
if err := d.ctx.Err(); err != nil {
return 0, wrapIfContextError(err)
}
Expand All @@ -113,18 +121,55 @@ func (d *duplexHTTPCall) Send(payload messsagePayload) (int64, error) {
// Signal that the stream is closed with the more-typical io.EOF instead of
// io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to
// match grpc-go's behavior.
return bytesWritten, io.EOF
err = io.EOF
}
return bytesWritten, err
}

func (d *duplexHTTPCall) sendUnary(payload messagePayload) (int64, error) {
// Unary messages are sent as a single HTTP request. We don't need to use a
// pipe for the request body and we don't need to send headers separately.
if !d.requestSent.CompareAndSwap(false, true) {
return 0, fmt.Errorf("request already sent")
}
payloadLength := int64(payload.Len())
if payloadLength > 0 {
// Build the request body from the payload.
payloadBody := newPayloadCloser(payload)
d.request.Body = payloadBody
d.request.ContentLength = payloadLength
d.request.GetBody = func() (io.ReadCloser, error) {
if !payloadBody.Rewind() {
return nil, fmt.Errorf("payload cannot be retried")
}
return payloadBody, nil
}
// Wait on the payloadBody to be completly read or closed before
// returning from Send. This ensures that the payload can be reused
// after Send returns. See [http.RoundTripper] for more details.
defer payloadBody.Wait()
} else {
d.request.GetBody = func() (io.ReadCloser, error) {
return http.NoBody, nil
}
}
d.makeRequest() // synchronous request
if err := d.ctx.Err(); err != nil {
return 0, wrapIfContextError(err)
}
return payloadLength, nil
}

// Close the request body. Callers *must* call CloseWrite before Read when
// using HTTP/1.x.
func (d *duplexHTTPCall) CloseWrite() error {
// Even if Write was never called, we need to make an HTTP request. This
// ensures that we've sent any headers to the server and that we have an HTTP
// response to read from.
d.ensureRequestMade()
if d.requestSent.CompareAndSwap(false, true) {
go d.makeRequest()
return nil
}
// The user calls CloseWrite to indicate that they're done sending data. It's
// safe to close the write side of the pipe while net/http is reading from
// it.
Expand All @@ -136,7 +181,10 @@ func (d *duplexHTTPCall) CloseWrite() error {
// forever. To make sure users don't have to worry about this, the generated
// code for unary, client streaming, and server streaming RPCs must call
// CloseWrite automatically rather than requiring the user to do it.
return d.requestBodyWriter.Close()
if d.requestBodyWriter != nil {
return d.requestBodyWriter.Close()
}
return d.request.Body.Close()
}

// Header returns the HTTP request headers.
Expand Down Expand Up @@ -171,9 +219,6 @@ func (d *duplexHTTPCall) Read(data []byte) (int, error) {
if err := d.ctx.Err(); err != nil {
return 0, wrapIfContextError(err)
}
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
n, err := d.response.Body.Read(data)
return n, wrapIfRSTError(err)
}
Expand Down Expand Up @@ -233,17 +278,6 @@ func (d *duplexHTTPCall) BlockUntilResponseReady() error {
return d.responseErr
}

// ensureRequestMade sends the request headers and starts the response stream.
// It is not safe to call this concurrently. Write and CloseWrite call this but
// ensure that they're not called concurrently.
func (d *duplexHTTPCall) ensureRequestMade() (isFirst bool) {
if d.requestSent.CompareAndSwap(false, true) {
go d.makeRequest()
return true
}
return false
}

func (d *duplexHTTPCall) makeRequest() {
// This runs concurrently with Write and CloseWrite. Read and CloseRead wait
// on d.responseReady, so we can't race with them.
Expand All @@ -253,7 +287,6 @@ func (d *duplexHTTPCall) makeRequest() {
if host := d.request.Header.Get(headerHost); len(host) > 0 {
d.request.Host = host
}

if d.onRequestSend != nil {
d.onRequestSend(d.request)
}
Expand All @@ -272,15 +305,15 @@ func (d *duplexHTTPCall) makeRequest() {
err = NewError(CodeUnavailable, err)
}
d.responseErr = err
d.requestBodyWriter.Close()
_ = d.CloseWrite()
return
}
// We've got a response. We can now read from the response body.
// Closing the response body is delegated to the caller even on error.
d.response = response
if err := d.validateResponse(response); err != nil {
d.responseErr = err
d.requestBodyWriter.Close()
_ = d.CloseWrite()
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
Expand All @@ -293,13 +326,13 @@ func (d *duplexHTTPCall) makeRequest() {
response.ProtoMajor,
response.ProtoMinor,
)
d.requestBodyWriter.Close()
_ = d.CloseWrite()
}
}

// messsagePayload is a sized and seekable message payload. The interface is
// implemented by [*bytes.Reader] and *envelope.
type messsagePayload interface {
// messagePayload is a sized and seekable message payload. The interface is
// implemented by [*bytes.Reader] and *envelope. Reads must be non-blocking.
type messagePayload interface {
io.Reader
io.WriterTo
io.Seeker
Expand All @@ -310,7 +343,7 @@ type messsagePayload interface {
// to the server.
type nopPayload struct{}

var _ messsagePayload = nopPayload{}
var _ messagePayload = nopPayload{}

func (nopPayload) Read([]byte) (int, error) {
return 0, io.EOF
Expand All @@ -328,7 +361,7 @@ func (nopPayload) Len() int {
// messageSender sends a message payload. The interface is implemented by
// [*duplexHTTPCall] and writeSender.
type messageSender interface {
Send(messsagePayload) (int64, error)
Send(messagePayload) (int64, error)
}

// writeSender is a sender that writes to an [io.Writer]. Useful for wrapping
Expand All @@ -339,7 +372,7 @@ type writeSender struct {

var _ messageSender = writeSender{}

func (w writeSender) Send(payload messsagePayload) (int64, error) {
func (w writeSender) Send(payload messagePayload) (int64, error) {
return payload.WriteTo(w.writer)
}

Expand All @@ -356,3 +389,92 @@ func cloneURL(oldURL *url.URL) *url.URL {
}
return newURL
}

// payloadCloser is an [io.ReadCloser] that wraps a messagePayload. It's used to
// implement the request body for unary calls. To safely reuse the buffer
// call Wait after the response is received to ensure the payload has been
// drained or closed. After Wait, the payload cannot be rewound. It's safe to
// call Close multiple times.
type payloadCloser struct {
mu sync.Mutex
cond sync.Cond
payload messagePayload
isDone bool // true if the payload has been fully read
}

func newPayloadCloser(payload messagePayload) *payloadCloser {
closer := &payloadCloser{
payload: payload,
}
closer.cond.L = &closer.mu
return closer
}

// Read implements [io.Reader], on error it signals that the payload has been
// fully read.
func (p *payloadCloser) Read(dst []byte) (readN int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.payload == nil {
return 0, io.EOF
}
readN, err = p.payload.Read(dst)
if err != nil || p.payload.Len() == 0 {
p.completeWithLock()
}
return readN, err
}

// WriteTo implements [io.WriterTo]. It signals that the payload has been fully
// read.
func (p *payloadCloser) WriteTo(dst io.Writer) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.isDone {
return 0, nil
}
n, err := p.payload.WriteTo(dst)
p.completeWithLock()
return n, err
}

// Close implements [io.Closer]. It signals completion of the payload.
func (p *payloadCloser) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
p.completeWithLock()
return nil
}

// Rewind rewinds the payload to the beginning. It returns false if the
// payload has been discarded from a previous call to Wait.
func (p *payloadCloser) Rewind() bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.payload == nil {
return false
}
if _, err := p.payload.Seek(0, io.SeekStart); err != nil {
return false
}
p.isDone = false
return true
}

// Wait blocks until the payload has been fully read or closed. After Wait, the
// payload is discarded and cannot be rewound. It's then safe to reuse the
// payload.
func (p *payloadCloser) Wait() {
p.mu.Lock()
for !p.isDone {
p.cond.Wait()
}
p.payload = nil
p.mu.Unlock()
}

// completeWithLock signals that the payload has been fully read.
func (p *payloadCloser) completeWithLock() {
p.isDone = true
p.cond.Broadcast()
}
Loading

0 comments on commit f85fce8

Please sign in to comment.