Skip to content

Commit

Permalink
Release buffer for end-stream messages back to the pool (#678)
Browse files Browse the repository at this point in the history
We previously were making a copy of every end-stream message
(in grpc-web and connect streaming protocols) and then not
releasing that copy to the buffer pool. This addresses that. The
benchmarks don't show noticeable improvement in durations but
do show the reduced allocations.
  • Loading branch information
jhump authored Feb 13, 2024
1 parent 7233f59 commit 1061b35
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
22 changes: 15 additions & 7 deletions envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,12 @@ type envelopeReader struct {

func (r *envelopeReader) Unmarshal(message any) *Error {
buffer := r.bufferPool.Get()
defer r.bufferPool.Put(buffer)
var dontRelease *bytes.Buffer
defer func() {
if buffer != dontRelease {
r.bufferPool.Put(buffer)
}
}()

env := &envelope{Data: buffer}
err := r.Read(env)
Expand Down Expand Up @@ -256,7 +261,11 @@ func (r *envelopeReader) Unmarshal(message any) *Error {
)
}
decompressed := r.bufferPool.Get()
defer r.bufferPool.Put(decompressed)
defer func() {
if decompressed != dontRelease {
r.bufferPool.Put(decompressed)
}
}()
if err := r.compressionPool.Decompress(decompressed, data, int64(r.readMaxBytes)); err != nil {
return err
}
Expand All @@ -276,14 +285,13 @@ func (r *envelopeReader) Unmarshal(message any) *Error {
}
// One of the protocol-specific flags are set, so this is the end of the
// stream. Save the message for protocol-specific code to process and
// return a sentinel error. Since we've deferred functions to return env's
// underlying buffer to a pool, we need to keep a copy.
copiedData := make([]byte, data.Len())
copy(copiedData, data.Bytes())
// return a sentinel error. We alias the buffer with dontRelease as a
// way of marking it so above defers don't release it to the pool.
r.last = envelope{
Data: bytes.NewBuffer(copiedData),
Data: data,
Flags: env.Flags,
}
dontRelease = data
return errSpecialEnvelope
}

Expand Down
7 changes: 5 additions & 2 deletions protocol_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,12 +879,15 @@ func (u *connectStreamingUnmarshaler) Unmarshal(message any) *Error {
if !errors.Is(err, errSpecialEnvelope) {
return err
}
env := u.envelopeReader.last
env := u.last
data := env.Data
u.last.Data = nil // don't keep a reference to it
defer u.bufferPool.Put(data)
if !env.IsSet(connectFlagEnvelopeEndStream) {
return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags)
}
var end connectEndStreamMessage
if err := json.Unmarshal(env.Data.Bytes(), &end); err != nil {
if err := json.Unmarshal(data.Bytes(), &end); err != nil {
return errorf(CodeInternal, "unmarshal end stream message: %w", err)
}
for name, value := range end.Trailer {
Expand Down
16 changes: 10 additions & 6 deletions protocol_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,10 @@ func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error {
}

type grpcUnmarshaler struct {
envelopeReader envelopeReader
web bool
webTrailer http.Header
envelopeReader

web bool
webTrailer http.Header
}

func (u *grpcUnmarshaler) Unmarshal(message any) *Error {
Expand All @@ -613,18 +614,21 @@ func (u *grpcUnmarshaler) Unmarshal(message any) *Error {
if !errors.Is(err, errSpecialEnvelope) {
return err
}
env := u.envelopeReader.last
env := u.last
data := env.Data
u.last.Data = nil // don't keep a reference to it
defer u.bufferPool.Put(data)
if !u.web || !env.IsSet(grpcFlagEnvelopeTrailer) {
return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags)
}

// Per the gRPC-Web specification, trailers should be encoded as an HTTP/1
// headers block _without_ the terminating newline. To make the headers
// parseable by net/textproto, we need to add the newline.
if err := env.Data.WriteByte('\n'); err != nil {
if err := data.WriteByte('\n'); err != nil {
return errorf(CodeInternal, "unmarshal web trailers: %w", err)
}
bufferedReader := bufio.NewReader(env.Data)
bufferedReader := bufio.NewReader(data)
mimeReader := textproto.NewReader(bufferedReader)
mimeHeader, mimeErr := mimeReader.ReadMIMEHeader()
if mimeErr != nil {
Expand Down

0 comments on commit 1061b35

Please sign in to comment.