Skip to content

Commit

Permalink
Fix otlphttpexporter parsing logic for Retry-After
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 12, 2025
1 parent 2493b6e commit 330bf11
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 58 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix-otlp-http-exp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix parsing logic for Retry-After in OTLP http protocol.

# One or more tracking issues or pull requests related to the change
issues: [12366]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The value of Retry-After field can be either an HTTP-date or delay-seconds and the current logic only parsed delay-seconds.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
42 changes: 26 additions & 16 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,34 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
}
formattedErr = httphelper.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err()

if isRetryableStatusCode(resp.StatusCode) {
// A retry duration of 0 seconds will trigger the default backoff policy
// of our caller (retry handler).
retryAfter := 0

// Check if the server is overwhelmed.
// See spec https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlphttp-throttling
isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable
if val := resp.Header.Get(headerRetryAfter); isThrottleError && val != "" {
if seconds, err2 := strconv.Atoi(val); err2 == nil {
retryAfter = seconds
}
if !isRetryableStatusCode(resp.StatusCode) {
return consumererror.NewPermanent(formattedErr)
}

// Check if the server is overwhelmed.
// See spec https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlphttp-throttling
isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable
if isThrottleError {
// Use Values to check if the header is present, and if present even if it is empty return ThrottleRetry.
values := resp.Header.Values(headerRetryAfter)
if len(values) == 0 {
return formattedErr
}

return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(retryAfter)*time.Second)
// The value of Retry-After field can be either an HTTP-date or a number of
// seconds to delay after the response is received. See https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3
//
// Retry-After = HTTP-date / delay-seconds
//
// First try to parse delay-seconds, since that is what the receiver will send.
if seconds, err := strconv.Atoi(values[0]); err == nil {
return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(seconds)*time.Second)
}
if date, err := time.Parse(time.RFC1123, values[0]); err == nil {
return exporterhelper.NewThrottleRetry(formattedErr, time.Until(date))
}
return formattedErr
}

return consumererror.NewPermanent(formattedErr)
return formattedErr
}

// Determine if the status code is retryable according to the specification.
Expand Down
104 changes: 62 additions & 42 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -94,117 +94,143 @@ func TestErrorResponses(t *testing.T) {
name string
responseStatus int
responseBody *status.Status
err func(srv *httptest.Server) error
isPermErr bool
checkErr func(t *testing.T, err error, srv *httptest.Server)
headers map[string]string
}{
{
name: "400",
responseStatus: http.StatusBadRequest,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "402",
responseStatus: http.StatusPaymentRequired,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "404",
responseStatus: http.StatusNotFound,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "405",
responseStatus: http.StatusMethodNotAllowed,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "413",
responseStatus: http.StatusRequestEntityTooLarge,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "414",
responseStatus: http.StatusRequestURITooLong,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "431",
responseStatus: http.StatusRequestHeaderFieldsTooLarge,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "429",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.ResourceExhausted, "Quota exceeded"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").Err(),
time.Duration(0)*time.Second)
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").String())
},
},
{
name: "429-Retry-After",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
headers: map[string]string{"Retry-After": "Mon, 09 Feb 2025 15:04:05 GMT"},
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
// Cannot test for the delay part since it depends on now. Check first part (which has a negative duration) and last part:
require.ErrorContains(t, err, "Throttle (-")
require.ErrorContains(t, err, "), error: "+status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").String())
},
},
{
name: "429-Retry-After-Malformed",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
headers: map[string]string{"Retry-After": "Malformed"},
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
// Cannot test for the delay part since it depends on now. Check first part (which has a negative duration) and last part:
require.EqualError(t, err, status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").String())
},
},
{
name: "500",
responseStatus: http.StatusInternalServerError,
responseBody: status.New(codes.InvalidArgument, "Internal server error"),
isPermErr: true,
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "502",
responseStatus: http.StatusBadGateway,
responseBody: status.New(codes.InvalidArgument, "Bad gateway"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
status.New(codes.Unavailable, errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]").Err(),
time.Duration(0)*time.Second)
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]").String())
},
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").Err(),
time.Duration(0)*time.Second)
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").String())
},
},
{
name: "503-Retry-After",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
headers: map[string]string{"Retry-After": "30"},
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, exporterhelper.NewThrottleRetry(
status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").Err(),
time.Duration(30)*time.Second)
time.Duration(30)*time.Second).Error())
},
},
{
name: "504",
responseStatus: http.StatusGatewayTimeout,
responseBody: status.New(codes.InvalidArgument, "Gateway timeout"),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
status.New(codes.Unavailable, errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]").Err(),
time.Duration(0)*time.Second)
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]").String())
},
},
{
name: "Bad response payload",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, strings.Repeat("a", maxHTTPResponseReadBytes+1)),
err: func(srv *httptest.Server) error {
return exporterhelper.NewThrottleRetry(
status.New(codes.Unavailable, errMsgPrefix(srv)+"503").Err(),
time.Duration(0)*time.Second)
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"503").String())
},
},
}
Expand Down Expand Up @@ -245,12 +271,7 @@ func TestErrorResponses(t *testing.T) {
traces := ptrace.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
require.Error(t, err)

if test.isPermErr {
assert.True(t, consumererror.IsPermanent(err))
} else {
assert.EqualValues(t, test.err(srv), err)
}
test.checkErr(t, err, srv)
})
}
}
Expand All @@ -261,8 +282,7 @@ func TestErrorResponseInvalidResponseBody(t *testing.T) {
Body: io.NopCloser(badReader{}),
ContentLength: 100,
}
status := readResponseStatus(resp)
assert.Nil(t, status)
assert.Nil(t, readResponseStatus(resp))
}

func TestUserAgent(t *testing.T) {
Expand Down

0 comments on commit 330bf11

Please sign in to comment.