Skip to content

Commit

Permalink
Fix OTLP http receiver to correctly set Retry-After
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 13, 2025
1 parent a4ae175 commit b3dd063
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 36 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix-http-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix OTLP http receiver to correctly set Retry-After

# One or more tracking issues or pull requests related to the change
issues: [12367]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
25 changes: 3 additions & 22 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"runtime"
"time"

"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
Expand All @@ -21,6 +20,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/statusutil"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -167,15 +167,15 @@ func processError(err error) error {
}

// Now, this is a real error.
retryInfo := getRetryInfo(st)
retryInfo := statusutil.GetRetryInfo(st)

if !shouldRetry(st.Code(), retryInfo) {
// It is not a retryable error, we should not retry.
return consumererror.NewPermanent(err)
}

// Check if server returned throttling information.
throttleDuration := getThrottleDuration(retryInfo)
throttleDuration := retryInfo.GetRetryDelay().AsDuration()
if throttleDuration != 0 {
// We are throttled. Wait before retrying as requested by the server.
return exporterhelper.NewThrottleRetry(err, throttleDuration)
Expand Down Expand Up @@ -203,22 +203,3 @@ func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool {
// Don't retry on any other code.
return false
}

func getRetryInfo(status *status.Status) *errdetails.RetryInfo {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t
}
}
return nil
}

func getThrottleDuration(t *errdetails.RetryInfo) time.Duration {
if t == nil || t.RetryDelay == nil {
return 0
}
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
}
5 changes: 2 additions & 3 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/httphelper"
"go.opentelemetry.io/collector/internal/statusutil"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -220,7 +220,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
"error exporting items, request to %s responded with HTTP Status Code %d",
url, resp.StatusCode)
}
formattedErr = httphelper.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err()
formattedErr = statusutil.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err()

if !isRetryableStatusCode(resp.StatusCode) {
return consumererror.NewPermanent(formattedErr)
Expand All @@ -247,7 +247,6 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
if date, err := time.Parse(time.RFC1123, values[0]); err == nil {
return exporterhelper.NewThrottleRetry(formattedErr, time.Until(date))
}
return formattedErr
}
return formattedErr
}
Expand Down
12 changes: 11 additions & 1 deletion internal/httphelper/helper.go → internal/statusutil/helper.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package httphelper // import "go.opentelemetry.io/collector/internal/httphelper"
package statusutil // import "go.opentelemetry.io/collector/internal/httphelper"

import (
"net/http"

"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -34,3 +35,12 @@ func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *status.Status {
}
return status.New(c, errMsg)
}

func GetRetryInfo(status *status.Status) *errdetails.RetryInfo {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t
}
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package httphelper
package statusutil

import (
"net/http"
Expand Down
37 changes: 28 additions & 9 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"io"
"mime"
"net/http"
"strconv"
"time"

spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/httphelper"
"go.opentelemetry.io/collector/internal/statusutil"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
Expand Down Expand Up @@ -185,28 +186,46 @@ func writeError(w http.ResponseWriter, encoder encoder, err error, statusCode in
if ok {
statusCode = errors.GetHTTPStatusCodeFromStatus(s)
} else {
s = httphelper.NewStatusFromMsgAndHTTPCode(err.Error(), statusCode)
s = statusutil.NewStatusFromMsgAndHTTPCode(err.Error(), statusCode)
}
writeStatusResponse(w, encoder, statusCode, s.Proto())
writeStatusResponse(w, encoder, statusCode, s)
}

// errorHandler encodes the HTTP error message inside a rpc.Status message as required
// by the OTLP protocol.
func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) {
s := httphelper.NewStatusFromMsgAndHTTPCode(errMsg, statusCode)
s := statusutil.NewStatusFromMsgAndHTTPCode(errMsg, statusCode)
switch getMimeTypeFromContentType(r.Header.Get("Content-Type")) {
case pbContentType:
writeStatusResponse(w, pbEncoder, statusCode, s.Proto())
writeStatusResponse(w, pbEncoder, statusCode, s)
return
case jsonContentType:
writeStatusResponse(w, jsEncoder, statusCode, s.Proto())
writeStatusResponse(w, jsEncoder, statusCode, s)
return
}
writeResponse(w, fallbackContentType, http.StatusInternalServerError, fallbackMsg)
}

func writeStatusResponse(w http.ResponseWriter, enc encoder, statusCode int, rsp *spb.Status) {
msg, err := enc.marshalStatus(rsp)
func writeStatusResponse(w http.ResponseWriter, enc encoder, statusCode int, st *status.Status) {
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#otlphttp-throttling
if statusCode == http.StatusTooManyRequests || statusCode == http.StatusServiceUnavailable {
retryInfo := statusutil.GetRetryInfo(st)
// Check if server returned throttling information.
throttleDuration := retryInfo.GetRetryDelay().AsDuration()
// A retry duration of 0 seconds will trigger the default backoff policy
// of our caller, no need to send.
if throttleDuration != 0 {
// We are throttled. Wait before retrying as requested by the server.
// The value of Retry-After field can be either an HTTP-date or a number of
// seconds to delay after the response is received. See https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3
//
// Retry-After = HTTP-date / delay-seconds
//
// Use delay-seconds since is easier to format as well as does not require clock synchronization.
w.Header().Set("Retry-After", strconv.FormatInt(int64(throttleDuration/time.Second), 10))
}
}
msg, err := enc.marshalStatus(st.Proto())
if err != nil {
writeResponse(w, fallbackContentType, http.StatusInternalServerError, fallbackMsg)
return
Expand Down
122 changes: 122 additions & 0 deletions receiver/otlpreceiver/otlphttp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpreceiver

import (
"context"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/internal/testutil"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
)

func TestHttpRetryAfter(t *testing.T) {
tests := []struct {
name string
contentType string
err error
expectedStatusCode int
expectedHasRetryAfter bool
expectedRetryAfter string
}{
{
name: "StatusErrorRetryableNoRetryAfter",
err: status.New(codes.DeadlineExceeded, "").Err(),
expectedStatusCode: http.StatusServiceUnavailable,
},
{
name: "StatusErrorRetryableRetryAfter",
err: func() error {
st := status.New(codes.ResourceExhausted, "")
dt, err := st.WithDetails(&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Second),
})
require.NoError(t, err)
return dt.Err()
}(),
expectedStatusCode: http.StatusTooManyRequests,
expectedHasRetryAfter: true,
expectedRetryAfter: "13",
},
{
name: "StatusErrorNotRetryableRetryAfter",
err: func() error {
st := status.New(codes.Unknown, "")
dt, err := st.WithDetails(&errdetails.RetryInfo{
RetryDelay: durationpb.New(12 * time.Second),
})
require.NoError(t, err)
return dt.Err()
}(),
expectedStatusCode: http.StatusInternalServerError,
},
{
name: "StatusErrorNotRetryableNoRetryAfter",
err: status.New(codes.InvalidArgument, "").Err(),
expectedStatusCode: http.StatusBadRequest,
},
}
addr := testutil.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := newErrOrSinkConsumer()
recv := newHTTPReceiver(t, componenttest.NewNopTelemetrySettings(), addr, sink)

require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, recv.Shutdown(context.Background())) })

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink.Reset()
sink.SetConsumeError(tt.err)

for _, dr := range generateDataRequests(t) {
url := "http://" + addr + dr.path
req := createHTTPRequest(t, url, "", "application/x-protobuf", dr.protoBytes)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)

respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

require.NoError(t, resp.Body.Close())
// For cases like "application/json; charset=utf-8", the response will be only "application/json"
require.True(t, strings.HasPrefix(strings.ToLower("application/x-protobuf"), resp.Header.Get("Content-Type")))
if tt.expectedHasRetryAfter {
require.Equal(t, tt.expectedRetryAfter, resp.Header.Get("Retry-After"))
} else {
require.Len(t, resp.Header.Get("Retry-After"), 0)
}

assert.Equal(t, tt.expectedStatusCode, resp.StatusCode)

if tt.err == nil {
tr := ptraceotlp.NewExportResponse()
require.NoError(t, tr.UnmarshalProto(respBytes))
sink.checkData(t, dr.data, 1)
} else {
errStatus := &spb.Status{}
require.NoError(t, proto.Unmarshal(respBytes, errStatus))
s, ok := status.FromError(tt.err)
require.True(t, ok)
assert.True(t, proto.Equal(errStatus, s.Proto()))
}
}
})
}
}

0 comments on commit b3dd063

Please sign in to comment.