Skip to content

Commit

Permalink
Distributor: split otlp related functions from push (#8328)
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne authored Jun 10, 2024
1 parent 6cc0bf7 commit 3917679
Show file tree
Hide file tree
Showing 4 changed files with 722 additions and 713 deletions.
126 changes: 126 additions & 0 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/tenant"
Expand All @@ -25,10 +27,13 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/distributor/otlp"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
utillog "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -198,6 +203,127 @@ func OTLPHandler(
})
}

func otlpHandler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
retryCfg RetryConfig,
push PushFunc,
logger log.Logger,
parser parserFunc,
) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := utillog.WithContext(ctx, logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
logger = utillog.WithSourceIPs(source, logger)
}
}
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

rb.CleanUp()
return nil, nil, err
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
}
req := newRequest(supplier)
if err := push(ctx, req); err != nil {
if errors.Is(err, context.Canceled) {
level.Warn(logger).Log("msg", "push request canceled", "err", err)
writeErrorToHTTPResponseBody(w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
return
}
var (
httpCode int
grpcCode codes.Code
errorMsg string
)
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
// here the error would always be nil, since it is already checked in httpgrpc.HTTPResponseFromError
s, _ := grpcutil.ErrorToStatus(err)
httpCode = int(resp.Code)
grpcCode = s.Code() // this will be the same as httpCode.
errorMsg = string(resp.Body)
} else {
grpcCode, httpCode = toOtlpGRPCHTTPStatus(err)
errorMsg = err.Error()
}
if httpCode != 202 {
// This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method.
msgs := []interface{}{"msg", "detected an error while ingesting OTLP metrics request (the request may have been partially ingested)", "httpCode", httpCode, "err", err}
if httpCode/100 == 4 {
msgs = append(msgs, "insight", true)
}
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, httpCode, retryCfg)
writeErrorToHTTPResponseBody(w, httpCode, grpcCode, errorMsg, logger)
}
})
}

// toOtlpGRPCHTTPStatus is utilized by the OTLP endpoint.
// According to the OTLP specifications (https://opentelemetry.io/docs/specs/otlp/#failures-1), unlike Prometheus, the OTLP client only retries on HTTP status codes 429, 502, 503, and 504.
func toOtlpGRPCHTTPStatus(pushErr error) (codes.Code, int) {
if !errors.Is(pushErr, context.DeadlineExceeded) {
var distributorErr Error
if errors.As(pushErr, &distributorErr) {
switch distributorErr.Cause() {
case mimirpb.BAD_DATA:
return codes.InvalidArgument, http.StatusBadRequest
case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED:
return codes.ResourceExhausted, http.StatusTooManyRequests
case mimirpb.REPLICAS_DID_NOT_MATCH:
return codes.OK, http.StatusAccepted
case mimirpb.TOO_MANY_CLUSTERS:
return codes.InvalidArgument, http.StatusBadRequest
case mimirpb.TSDB_UNAVAILABLE:
return codes.Unavailable, http.StatusServiceUnavailable
case mimirpb.CIRCUIT_BREAKER_OPEN:
return codes.Unavailable, http.StatusServiceUnavailable
case mimirpb.METHOD_NOT_ALLOWED:
return codes.Unimplemented, http.StatusNotImplemented
}
}
}
return codes.Internal, http.StatusServiceUnavailable
}

// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
// See doc https://opentelemetry.io/docs/specs/otlp/#failures-1
func writeErrorToHTTPResponseBody(w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(httpCode)

respBytes, err := proto.Marshal(grpcstatus.New(grpcCode, msg).Proto())
if err != nil {
level.Error(logger).Log("msg", "otlp response marshal failed", "err", err)
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(codes.Internal, "failed to marshal OTLP response").Proto())
_, _ = w.Write(writeResponseFailedBody)
return
}

_, err = w.Write(respBytes)
if err != nil {
level.Error(logger).Log("msg", "write error to otlp response failed", "err", err)
}
}

// otlpProtoUnmarshaler implements proto.Message wrapping pmetricotlp.ExportRequest.
type otlpProtoUnmarshaler struct {
request *pmetricotlp.ExportRequest
Expand Down
Loading

0 comments on commit 3917679

Please sign in to comment.