Skip to content
This repository has been archived by the owner on Nov 7, 2023. It is now read-only.

Commit

Permalink
Handle partial success responses in the OTLP HTTP exporter (#6970)
Browse files Browse the repository at this point in the history
Handle partial success messages returned from OTLP HTTP backends.

Fixes
open-telemetry/opentelemetry-collector#6686

---------

Co-authored-by: Evan Bradley <evan-bradley@users.noreply.github.com>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
  • Loading branch information
3 people authored Jun 27, 2023
1 parent 04f889b commit 5852d09
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 95 deletions.
16 changes: 16 additions & 0 deletions .chloggen/otlphttp-partial-success.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Treat partial success responses as errors

# One or more tracking issues or pull requests related to the change
issues: [6686]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
121 changes: 101 additions & 20 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
return consumererror.NewPermanent(err)
}

return e.export(ctx, e.tracesURL, request)
return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler)
}

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
Expand All @@ -99,7 +99,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
if err != nil {
return consumererror.NewPermanent(err)
}
return e.export(ctx, e.metricsURL, request)
return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler)
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
Expand All @@ -109,10 +109,10 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
return consumererror.NewPermanent(err)
}

return e.export(ctx, e.logsURL, request)
return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler)
}

func (e *baseExporter) export(ctx context.Context, url string, request []byte) error {
func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error {
e.logger.Debug("Preparing to make HTTP request", zap.String("url", url))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request))
if err != nil {
Expand All @@ -133,11 +133,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e
}()

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// Request is successful.
return nil
return handlePartialSuccessResponse(resp, partialSuccessHandler)
}

respStatus := readResponse(resp)
respStatus := readResponseStatus(resp)

// Format the error message. Use the status if it is present in the response.
var formattedErr error
Expand Down Expand Up @@ -188,29 +187,111 @@ func isRetryableStatusCode(code int) bool {
}
}

func readResponseBody(resp *http.Response) ([]byte, error) {
if resp.ContentLength == 0 {
return nil, nil
}

maxRead := resp.ContentLength

// if maxRead == -1, the ContentLength header has not been sent, so read up to
// the maximum permitted body size. If it is larger than the permitted body
// size, still try to read from the body in case the value is an error. If the
// body is larger than the maximum size, proto unmarshaling will likely fail.
if maxRead == -1 || maxRead > maxHTTPResponseReadBytes {
maxRead = maxHTTPResponseReadBytes
}
protoBytes := make([]byte, maxRead)
n, err := io.ReadFull(resp.Body, protoBytes)

// No bytes read and an EOF error indicates there is no body to read.
if n == 0 && (err == nil || errors.Is(err, io.EOF)) {
return nil, nil
}

// io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header
// wasn't set, since we will try to read past the length of the body. If this
// is the case, the body will still have the full message in it, so we want to
// ignore the error and parse the message.
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

return protoBytes[:n], nil
}

// Read the response and decode the status.Status from the body.
// Returns nil if the response is empty or cannot be decoded.
func readResponse(resp *http.Response) *status.Status {
func readResponseStatus(resp *http.Response) *status.Status {
var respStatus *status.Status
if resp.StatusCode >= 400 && resp.StatusCode <= 599 {
// Request failed. Read the body. OTLP spec says:
// "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a
// Protobuf-encoded Status message that describes the problem."
maxRead := resp.ContentLength
if maxRead == -1 || maxRead > maxHTTPResponseReadBytes {
maxRead = maxHTTPResponseReadBytes
respBytes, err := readResponseBody(resp)

if err != nil {
return nil
}
respBytes := make([]byte, maxRead)
n, err := io.ReadFull(resp.Body, respBytes)
if err == nil && n > 0 {
// Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures
respStatus = &status.Status{}
err = proto.Unmarshal(respBytes, respStatus)
if err != nil {
respStatus = nil
}

// Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures
respStatus = &status.Status{}
err = proto.Unmarshal(respBytes, respStatus)
if err != nil {
return nil
}
}

return respStatus
}

func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error {
bodyBytes, err := readResponseBody(resp)

if err != nil {
return err
}

return partialSuccessHandler(bodyBytes)
}

type partialSuccessHandler func(protoBytes []byte) error

func tracesPartialSuccessHandler(protoBytes []byte) error {
exportResponse := ptraceotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans()))
}
return nil
}

func metricsPartialSuccessHandler(protoBytes []byte) error {
exportResponse := pmetricotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints()))
}
return nil
}

func logsPartialSuccessHandler(protoBytes []byte) error {
exportResponse := plogotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords()))
}
return nil
}
Loading

0 comments on commit 5852d09

Please sign in to comment.