Skip to content

Commit

Permalink
Propagate errors from exporters to receivers
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed May 15, 2023
1 parent 6542100 commit 0e2400a
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 21 deletions.
10 changes: 4 additions & 6 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (

"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/status"
gstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/errs"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -153,13 +155,9 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e
// Format the error message. Use the status if it is present in the response.
var formattedErr error
if respStatus != nil {
formattedErr = fmt.Errorf(
"error exporting items, request to %s responded with HTTP Status Code %d, Message=%s, Details=%v",
url, resp.StatusCode, respStatus.Message, respStatus.Details)
formattedErr = gstatus.ErrorProto(respStatus)
} else {
formattedErr = fmt.Errorf(
"error exporting items, request to %s responded with HTTP Status Code %d",
url, resp.StatusCode)
formattedErr = errs.NewRequestError(resp.StatusCode, "error exporting items to %q", url)
}

if isRetryableStatusCode(resp.StatusCode) {
Expand Down
15 changes: 7 additions & 8 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ func startAndCleanup(t *testing.T, cmp component.Component) {

func TestErrorResponses(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
errMsgPrefix := fmt.Sprintf("error exporting items, request to http://%s/v1/traces responded with HTTP Status Code ", addr)

tests := []struct {
name string
Expand Down Expand Up @@ -436,11 +435,11 @@ func TestErrorResponses(t *testing.T) {
isPermErr: true,
},
{
name: "419",
name: "429",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"429, Message=Quota exceeded, Details=[]"),
status.New(codes.InvalidArgument, "Quota exceeded").Err(),
time.Duration(0)*time.Second),
},
{
Expand All @@ -454,15 +453,15 @@ func TestErrorResponses(t *testing.T) {
responseStatus: http.StatusBadGateway,
responseBody: status.New(codes.InvalidArgument, "Bad gateway"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"502, Message=Bad gateway, Details=[]"),
status.New(codes.InvalidArgument, "Bad gateway").Err(),
time.Duration(0)*time.Second),
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"),
status.New(codes.InvalidArgument, "Server overloaded").Err(),
time.Duration(0)*time.Second),
},
{
Expand All @@ -471,15 +470,15 @@ func TestErrorResponses(t *testing.T) {
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
headers: map[string]string{"Retry-After": "30"},
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"),
status.New(codes.InvalidArgument, "Server overloaded").Err(),
time.Duration(30)*time.Second),
},
{
name: "504",
responseStatus: http.StatusGatewayTimeout,
responseBody: status.New(codes.InvalidArgument, "Gateway timeout"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"504, Message=Gateway timeout, Details=[]"),
status.New(codes.InvalidArgument, "Gateway timeout").Err(),
time.Duration(0)*time.Second),
},
}
Expand Down Expand Up @@ -532,7 +531,7 @@ func TestErrorResponses(t *testing.T) {
if test.isPermErr {
assert.True(t, consumererror.IsPermanent(err))
} else {
assert.EqualValues(t, test.err, err)
assert.EqualValues(t, test.err.Error(), err.Error())
}

srv.Close()
Expand Down
45 changes: 45 additions & 0 deletions internal/errs/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errs

import "fmt"

// RequestError represents an error returned during HTTP client operations
type RequestError struct {
statusCode int
message string
args []any
}

// NewRequestError creates a new HTTP Client Request error with the given parameters
func NewRequestError(statusCode int, message string, args ...any) *RequestError {
return &RequestError{
message: message,
args: args,
statusCode: statusCode,
}
}

func (r *RequestError) Error() string {
return r.Message()
}

func (r *RequestError) Message() string {
return fmt.Sprintf(r.message, r.args...)
}

func (r *RequestError) StatusCode() int {
return r.statusCode
}
61 changes: 61 additions & 0 deletions receiver/otlpreceiver/erroradapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlpreceiver"

import (
"net/http"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func toHTTP(s *status.Status) int {
switch s.Code() {
case codes.Aborted:
return http.StatusInternalServerError
case codes.AlreadyExists:
return http.StatusConflict
case codes.Canceled:
return http.StatusInternalServerError
case codes.DataLoss:
return http.StatusInternalServerError
case codes.DeadlineExceeded:
return http.StatusRequestTimeout
case codes.FailedPrecondition:
return http.StatusPreconditionFailed
case codes.Internal:
return http.StatusInternalServerError
case codes.InvalidArgument:
return http.StatusBadRequest
case codes.NotFound:
return http.StatusNotFound
case codes.OutOfRange:
return http.StatusBadRequest
case codes.PermissionDenied:
return http.StatusForbidden
case codes.ResourceExhausted:
return http.StatusInternalServerError
case codes.Unauthenticated:
return http.StatusUnauthorized
case codes.Unavailable:
return http.StatusServiceUnavailable
case codes.Unimplemented:
return http.StatusNotFound
case codes.Unknown:
return http.StatusInternalServerError
default:
return http.StatusInternalServerError
}
}
24 changes: 18 additions & 6 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,10 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, enco

func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
name string
encoding string
err error
statusCode int
}{
{
name: "ProtoUncompressed",
Expand All @@ -448,6 +449,12 @@ func TestProtoHttp(t *testing.T) {
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
{
name: "Invalid argument becomes bad request",
encoding: "",
err: status.New(codes.InvalidArgument, "").Err(),
statusCode: http.StatusBadRequest,
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand All @@ -471,7 +478,7 @@ func TestProtoHttp(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("http://%s/v1/traces", addr)
tSink.Reset()
testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, td)
testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, td, test.statusCode)
})
}
}
Expand Down Expand Up @@ -506,7 +513,12 @@ func testHTTPProtobufRequest(
traceBytes []byte,
expectedErr error,
wantData ptrace.Traces,
httpStatusCode int,
) {
if httpStatusCode == 0 {
httpStatusCode = http.StatusInternalServerError
}

tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)
Expand Down Expand Up @@ -535,10 +547,10 @@ func testHTTPProtobufRequest(
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(expectedErr); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, httpStatusCode, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, httpStatusCode, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
require.Len(t, allTraces, 0)
Expand Down
15 changes: 14 additions & 1 deletion receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/errs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace"
Expand All @@ -47,7 +48,19 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t

otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq)
if err != nil {
writeError(resp, encoder, err, http.StatusInternalServerError)
httpStatus := http.StatusInternalServerError // the default status on errors

// perhaps we had a gRPC exporter for this data returning an error?
if s, ok := status.FromError(err); ok {
httpStatus = toHTTP(s)
}

// perhaps it was an HTTP exporter that failed?
if s, ok := err.(*errs.RequestError); ok {
httpStatus = s.StatusCode()
}

writeError(resp, encoder, err, httpStatus)
return
}

Expand Down

0 comments on commit 0e2400a

Please sign in to comment.