From 8af36859c507bd2585143cbd43b30cc27fd90a1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 13 Jun 2024 18:35:41 +0200 Subject: [PATCH] Cherry-pick #8363 to r294 (#8367) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix reporting of HTTP error messages with binary body content (#8363) * Fix handling of HTTP error responses with binary data. Signed-off-by: Peter Štibraný * Update dskit, address review feedback. Signed-off-by: Peter Štibraný * CHANGELOG.md Signed-off-by: Peter Štibraný * Fix linter errors. Signed-off-by: Peter Štibraný * Fix go.mod Signed-off-by: Peter Štibraný * Address review feedback. Signed-off-by: Peter Štibraný --------- Signed-off-by: Peter Štibraný * Fix go.mod. Signed-off-by: Peter Štibraný --------- Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- pkg/distributor/otel.go | 7 +- pkg/distributor/push.go | 13 +- pkg/distributor/push_test.go | 213 ++++++++++++++++++ .../grafana/dskit/httpgrpc/httpgrpc.go | 10 +- .../grafana/dskit/httpgrpc/server/server.go | 37 ++- vendor/modules.txt | 2 +- 9 files changed, 275 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7aaed3a725..3b08afab823 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ * [BUGFIX] Query-frontend: fix splitting of queries using `@ start()` and `@end()` modifiers on a subquery. Previously the `start()` and `end()` would be evaluated using the start end end of the split query instead of the original query. #8162 * [BUGFIX] Distributor: Don't discard time series with invalid exemplars, just drop affected exemplars. #8224 * [BUGFIX] Ingester: fixed in-memory series count when replaying a corrupted WAL. #8295 +* [BUGFIX] OTLP handler: fix errors returned by OTLP handler when used via httpgrpc tunneling. #8363 ### Mixin diff --git a/go.mod b/go.mod index c3c63c1c68e..89e9bc2faf3 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 + github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 709c1017b65..67caf3833a7 100644 --- a/go.sum +++ b/go.sum @@ -507,8 +507,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240605124151-5d695b88086a h1:7TdeZQkS8D+jaAikdcl3EVDrm6QaoEXrJIJKaBcPkzE= github.com/grafana/alerting v0.0.0-20240605124151-5d695b88086a/go.mod h1:McUkQuCHiARiXzM3n/69Gduyw0UaCYJKfu2SR4znomQ= -github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 h1:k8vINlI4w+RYc37NRwQlRe/IHYoEbu6KAe2XdGDeV1U= -github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc= +github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54 h1:o3oKDQKvw5e9GZBKR0s2qaNZAYGXlWk8hPdjrdvcD3I= +github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM= diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index d01ce91c2b9..92e5dcf0485 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -41,13 +41,18 @@ const ( maxErrMsgLen = 1024 ) +type OTLPHandlerLimits interface { + OTelMetricSuffixesEnabled(id string) bool + ServiceOverloadStatusCodeOnRateLimitEnabled(id string) bool +} + // OTLPHandler is an http.Handler accepting OTLP write requests. func OTLPHandler( maxRecvMsgSize int, requestBufferPool util.Pool, sourceIPs *middleware.SourceIPExtractor, enableOtelMetadataStorage bool, - limits *validation.Overrides, + limits OTLPHandlerLimits, retryCfg RetryConfig, push PushFunc, pushMetrics *PushMetrics, diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index 1c989af6cd6..b8b970a4637 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -194,7 +194,7 @@ func otlpHandler( maxRecvMsgSize int, requestBufferPool util.Pool, sourceIPs *middleware.SourceIPExtractor, - limits *validation.Overrides, + limits OTLPHandlerLimits, retryCfg RetryConfig, push PushFunc, logger log.Logger, @@ -232,7 +232,7 @@ func otlpHandler( if err := push(ctx, req); err != nil { if errors.Is(err, context.Canceled) { level.Warn(logger).Log("msg", "push request canceled", "err", err) - writeErrorToHTTPResponseBody(w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger) + writeErrorToHTTPResponseBody(r.Context(), w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger) return } var ( @@ -259,16 +259,19 @@ func otlpHandler( level.Error(logger).Log(msgs...) } addHeaders(w, err, r, httpCode, retryCfg) - writeErrorToHTTPResponseBody(w, httpCode, grpcCode, errorMsg, logger) + writeErrorToHTTPResponseBody(r.Context(), w, httpCode, grpcCode, errorMsg, logger) } }) } // writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body. // See doc https://opentelemetry.io/docs/specs/otlp/#failures-1 -func writeErrorToHTTPResponseBody(w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) { +func writeErrorToHTTPResponseBody(reqCtx context.Context, w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) { w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("X-Content-Type-Options", "nosniff") + if server.IsHandledByHttpgrpcServer(reqCtx) { + w.Header().Set(server.ErrorMessageHeaderKey, msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body. + } w.WriteHeader(httpCode) respBytes, err := proto.Marshal(grpcstatus.New(grpcCode, msg).Proto()) @@ -306,7 +309,7 @@ func calculateRetryAfter(retryAttemptHeader string, baseSeconds int, maxBackoffE // toGRPCHTTPStatus converts the given error into an appropriate GRPC and HTTP status corresponding // to that error, if the error is one of the errors from this package. Otherwise, codes.Internal and // http.StatusInternalServerError is returned. -func toGRPCHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrides) (codes.Code, int) { +func toGRPCHTTPStatus(ctx context.Context, pushErr error, limits OTLPHandlerLimits) (codes.Code, int) { if errors.Is(pushErr, context.DeadlineExceeded) { return codes.Internal, http.StatusInternalServerError } diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 500b3d014c1..d637728068a 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -9,12 +9,15 @@ import ( "bytes" "compress/gzip" "context" + "flag" "fmt" "io" "net/http" "net/http/httptest" + "sort" "strconv" "strings" + "sync" "testing" "time" @@ -27,8 +30,10 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/middleware" + dskit_server "github.com/grafana/dskit/server" "github.com/grafana/dskit/user" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" @@ -38,7 +43,10 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "github.com/grafana/mimir/pkg/ingester/client" @@ -1343,3 +1351,208 @@ func TestRetryConfig_Validate(t *testing.T) { }) } } + +func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := dskit_server.Config{} + // Set default values + cfg.RegisterFlags(flag.NewFlagSet("test", flag.ContinueOnError)) + + // Configure values for test. + cfg.HTTPListenAddress = "localhost" + cfg.HTTPListenPort = 0 // auto-assign + cfg.GRPCListenAddress = "localhost" + cfg.GRPCListenPort = 0 // auto-assign + cfg.Registerer = reg + cfg.Gatherer = reg + cfg.ReportHTTP4XXCodesInInstrumentationLabel = true // report 400 as errors. + cfg.GRPCMiddleware = []grpc.UnaryServerInterceptor{middleware.ServerUserHeaderInterceptor} + cfg.HTTPMiddleware = []middleware.Interface{middleware.AuthenticateUser} + + srv, err := dskit_server.New(cfg) + require.NoError(t, err) + + push := func(ctx context.Context, req *Request) error { + // Trigger conversion of incoming request to WriteRequest. + wr, err := req.WriteRequest() + if err != nil { + return err + } + + if len(wr.Timeseries) > 0 && len(wr.Timeseries[0].Labels) > 0 && wr.Timeseries[0].Labels[0].Name == "__name__" && wr.Timeseries[0].Labels[0].Value == "report_server_error" { + return errors.New("some random push error") + } + + return nil + } + h := OTLPHandler(200, util.NewBufferPool(), nil, false, otlpLimitsMock{}, RetryConfig{Enabled: false}, push, newPushMetrics(reg), reg, log.NewNopLogger(), true) + srv.HTTP.Handle("/otlp", h) + + // start the server + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { defer wg.Done(); _ = srv.Run() }() + t.Cleanup(func() { + srv.Stop() + wg.Wait() + }) + + // create client + conn, err := grpc.NewClient(srv.GRPCListenAddr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(middleware.ClientUserHeaderInterceptor)) + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + + type testCase struct { + request *httpgrpc.HTTPRequest + expectedResponse *httpgrpc.HTTPResponse + expectedGrpcErrorMessage string + } + + testcases := map[string]testCase{ + "missing content type returns 415": { + request: &httpgrpc.HTTPRequest{ + Method: "POST", + Url: "/otlp", + Body: []byte("hello"), + }, + expectedResponse: &httpgrpc.HTTPResponse{Code: 415, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/octet-stream"}}, + {Key: "X-Content-Type-Options", Values: []string{"nosniff"}}, + }, + Body: mustMarshalStatus(t, 415, "unsupported content type: , supported: [application/json, application/x-protobuf]"), + }, + expectedGrpcErrorMessage: "rpc error: code = Code(415) desc = unsupported content type: , supported: [application/json, application/x-protobuf]", + }, + + "invalid JSON request returns 400": { + request: &httpgrpc.HTTPRequest{ + Method: "POST", + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/json"}}, + }, + Url: "/otlp", + Body: []byte("invalid"), + }, + expectedResponse: &httpgrpc.HTTPResponse{Code: 400, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/octet-stream"}}, + {Key: "X-Content-Type-Options", Values: []string{"nosniff"}}, + }, + Body: mustMarshalStatus(t, 400, "ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|..."), + }, + expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|...", + }, + + "empty JSON is good request, with 200 status code": { + request: &httpgrpc.HTTPRequest{ + Method: "POST", + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/json"}}, + }, + Url: "/otlp", + Body: []byte("{}"), + }, + expectedResponse: &httpgrpc.HTTPResponse{Code: 200, + Headers: nil, // No headers expected for 200. + Body: nil, // No body expected for 200 code. + }, + expectedGrpcErrorMessage: "", // No error expected + }, + + "trigger 5xx error by sending special metric": { + request: &httpgrpc.HTTPRequest{ + Method: "POST", + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/json"}}, + }, + Url: "/otlp", + // This is simple OTLP request, with "report_server_error". + Body: []byte(`{"resourceMetrics": [{"scopeMetrics": [{"metrics": [{"name": "report_server_error", "gauge": {"dataPoints": [{"timeUnixNano": "1679912463340000000", "asDouble": 10.66}]}}]}]}]}`), + }, + expectedResponse: &httpgrpc.HTTPResponse{Code: 500, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/octet-stream"}}, + {Key: "X-Content-Type-Options", Values: []string{"nosniff"}}, + }, + Body: mustMarshalStatus(t, codes.Internal, "some random push error"), + }, + expectedGrpcErrorMessage: "rpc error: code = Code(500) desc = some random push error", + }, + } + + hc := httpgrpc.NewHTTPClient(conn) + httpClient := http.Client{} + + for name, tc := range testcases { + t.Run(fmt.Sprintf("grpc: %s", name), func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := hc.Handle(ctx, tc.request) + + if err != nil { + require.EqualError(t, err, tc.expectedGrpcErrorMessage) + + errresp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "errors reported by OTLP handler should always be convertible to HTTP response") + resp = errresp + } else if tc.expectedGrpcErrorMessage != "" { + require.Failf(t, "expected error message %q, but got no error", tc.expectedGrpcErrorMessage) + } + + // Before comparing response, we sort headers, to keep comparison stable. + sort.Slice(resp.Headers, func(i, j int) bool { + return resp.Headers[i].Key < resp.Headers[j].Key + }) + require.Equal(t, tc.expectedResponse, resp) + }) + + t.Run(fmt.Sprintf("http: %s", name), func(t *testing.T) { + req, err := httpgrpc.ToHTTPRequest(context.Background(), tc.request) + require.NoError(t, err) + + req.Header.Add("X-Scope-OrgID", "test") + req.RequestURI = "" + req.URL.Scheme = "http" + req.URL.Host = srv.HTTPListenAddr().String() + + resp, err := httpClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + if len(body) == 0 { + body = nil // to simplify test + } + + // Verify that body is the same as we expect through gRPC. + require.Equal(t, tc.expectedResponse.Body, body) + + // Verify that expected headers are in the response. + for _, h := range tc.expectedResponse.Headers { + assert.Equal(t, h.Values, resp.Header.Values(h.Key)) + } + + // Verify that header that indicates grpc error for httpgrpc.Server is not in the response. + assert.Empty(t, resp.Header.Get(server.ErrorMessageHeaderKey)) + }) + } +} + +func mustMarshalStatus(t *testing.T, code codes.Code, msg string) []byte { + bytes, err := proto.Marshal(grpcstatus.New(code, msg).Proto()) + require.NoError(t, err) + return bytes +} + +type otlpLimitsMock struct{} + +func (o otlpLimitsMock) ServiceOverloadStatusCodeOnRateLimitEnabled(_ string) bool { + return false +} + +func (o otlpLimitsMock) OTelMetricSuffixesEnabled(_ string) bool { + return false +} diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index b755e2adcea..02e6e493736 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -116,8 +116,14 @@ func Errorf(code int, tmpl string, args ...interface{}) error { }) } -// ErrorFromHTTPResponse converts an HTTP response into a grpc error +// ErrorFromHTTPResponse converts an HTTP response into a grpc error, and uses HTTP response body as an error message. +// Note that if HTTP response body contains non-utf8 string, then returned error cannot be marshalled by protobuf. func ErrorFromHTTPResponse(resp *HTTPResponse) error { + return ErrorFromHTTPResponseWithMessage(resp, string(resp.Body)) +} + +// ErrorFromHTTPResponseWithMessage converts an HTTP response into a grpc error, and uses supplied message for Error message. +func ErrorFromHTTPResponseWithMessage(resp *HTTPResponse, msg string) error { a, err := types.MarshalAny(resp) if err != nil { return err @@ -125,7 +131,7 @@ func ErrorFromHTTPResponse(resp *HTTPResponse) error { return status.ErrorProto(&spb.Status{ Code: resp.Code, - Message: string(resp.Body), + Message: msg, Details: []*types.Any{a}, }) } diff --git a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go index b73c5a0f775..6a831dac0f8 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go @@ -26,12 +26,22 @@ import ( ) var ( - // DoNotLogErrorHeaderKey is a header key used for marking non-loggable errors. More precisely, if an HTTP response + // DoNotLogErrorHeaderKey is a header name used for marking non-loggable errors. More precisely, if an HTTP response // has a status code 5xx, and contains a header with key DoNotLogErrorHeaderKey and any values, the generated error // will be marked as non-loggable. DoNotLogErrorHeaderKey = http.CanonicalHeaderKey("X-DoNotLogError") + + // ErrorMessageHeaderKey is a header name for header that contains error message that should be used when Server.Handle + // (httpgrpc.HTTP/Handle implementation) decides to return the response as an error, using status.ErrorProto. + // Normally Server.Handle would use entire response body as a error message, but Message field of rcp.Status object + // is a string, and if body contains non-utf8 bytes, marshalling of this object will fail. + ErrorMessageHeaderKey = http.CanonicalHeaderKey("X-ErrorMessage") ) +type contextType int + +const handledByHttpgrpcServer contextType = 0 + type Option func(*Server) func WithReturn4XXErrors(s *Server) { @@ -59,6 +69,8 @@ func NewServer(handler http.Handler, opts ...Option) *Server { // Handle implements HTTPServer. func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + ctx = context.WithValue(ctx, handledByHttpgrpcServer, true) + req, err := httpgrpc.ToHTTPRequest(ctx, r) if err != nil { return nil, err @@ -74,13 +86,24 @@ func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc. header.Del(DoNotLogErrorHeaderKey) // remove before converting to httpgrpc resp } + errorMessageFromHeader := "" + if msg, ok := header[ErrorMessageHeaderKey]; ok { + errorMessageFromHeader = msg[0] + header.Del(ErrorMessageHeaderKey) // remove before converting to httpgrpc resp + } + resp := &httpgrpc.HTTPResponse{ Code: int32(recorder.Code), Headers: httpgrpc.FromHeader(header), Body: recorder.Body.Bytes(), } if s.shouldReturnError(resp) { - err := httpgrpc.ErrorFromHTTPResponse(resp) + var err error + if errorMessageFromHeader != "" { + err = httpgrpc.ErrorFromHTTPResponseWithMessage(resp, errorMessageFromHeader) + } else { + err = httpgrpc.ErrorFromHTTPResponse(resp) + } if doNotLogError { err = middleware.DoNotLogError{Err: err} } @@ -206,3 +229,13 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } + +// IsHandledByHttpgrpcServer returns true if context is associated with HTTP request that was initiated by +// Server.Handle, which is an implementation of httpgrpc.HTTP/Handle gRPC method. +func IsHandledByHttpgrpcServer(ctx context.Context) bool { + val := ctx.Value(handledByHttpgrpcServer) + if v, ok := val.(bool); ok { + return v + } + return false +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5cfdb500ab2..2f8a6edd583 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -596,7 +596,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 +# github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54 ## explicit; go 1.20 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast