Skip to content

Commit

Permalink
Cherry-pick #8363 to r294 (#8367)
Browse files Browse the repository at this point in the history
* Fix reporting of HTTP error messages with binary body content (#8363)

* Fix handling of HTTP error responses with binary data.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Update dskit, address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix linter errors.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix go.mod

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

---------

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix go.mod.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

---------

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany authored Jun 13, 2024
1 parent 411c609 commit 8af3685
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
* [BUGFIX] Query-frontend: fix splitting of queries using `@ start()` and `@end()` modifiers on a subquery. Previously the `start()` and `end()` would be evaluated using the start end end of the split query instead of the original query. #8162
* [BUGFIX] Distributor: Don't discard time series with invalid exemplars, just drop affected exemplars. #8224
* [BUGFIX] Ingester: fixed in-memory series count when replaying a corrupted WAL. #8295
* [BUGFIX] OTLP handler: fix errors returned by OTLP handler when used via httpgrpc tunneling. #8363

### Mixin

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3
github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20240605124151-5d695b88086a h1:7TdeZQkS8D+jaAikdcl3EVDrm6QaoEXrJIJKaBcPkzE=
github.com/grafana/alerting v0.0.0-20240605124151-5d695b88086a/go.mod h1:McUkQuCHiARiXzM3n/69Gduyw0UaCYJKfu2SR4znomQ=
github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 h1:k8vINlI4w+RYc37NRwQlRe/IHYoEbu6KAe2XdGDeV1U=
github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc=
github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54 h1:o3oKDQKvw5e9GZBKR0s2qaNZAYGXlWk8hPdjrdvcD3I=
github.com/grafana/dskit v0.0.0-20240613153104-a26a26cbbb54/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
7 changes: 6 additions & 1 deletion pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ const (
maxErrMsgLen = 1024
)

type OTLPHandlerLimits interface {
OTelMetricSuffixesEnabled(id string) bool
ServiceOverloadStatusCodeOnRateLimitEnabled(id string) bool
}

// OTLPHandler is an http.Handler accepting OTLP write requests.
func OTLPHandler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
enableOtelMetadataStorage bool,
limits *validation.Overrides,
limits OTLPHandlerLimits,
retryCfg RetryConfig,
push PushFunc,
pushMetrics *PushMetrics,
Expand Down
13 changes: 8 additions & 5 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func otlpHandler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
limits *validation.Overrides,
limits OTLPHandlerLimits,
retryCfg RetryConfig,
push PushFunc,
logger log.Logger,
Expand Down Expand Up @@ -232,7 +232,7 @@ func otlpHandler(
if err := push(ctx, req); err != nil {
if errors.Is(err, context.Canceled) {
level.Warn(logger).Log("msg", "push request canceled", "err", err)
writeErrorToHTTPResponseBody(w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
writeErrorToHTTPResponseBody(r.Context(), w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
return
}
var (
Expand All @@ -259,16 +259,19 @@ func otlpHandler(
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, httpCode, retryCfg)
writeErrorToHTTPResponseBody(w, httpCode, grpcCode, errorMsg, logger)
writeErrorToHTTPResponseBody(r.Context(), w, httpCode, grpcCode, errorMsg, logger)
}
})
}

// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
// See doc https://opentelemetry.io/docs/specs/otlp/#failures-1
func writeErrorToHTTPResponseBody(w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
func writeErrorToHTTPResponseBody(reqCtx context.Context, w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Content-Type-Options", "nosniff")
if server.IsHandledByHttpgrpcServer(reqCtx) {
w.Header().Set(server.ErrorMessageHeaderKey, msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body.
}
w.WriteHeader(httpCode)

respBytes, err := proto.Marshal(grpcstatus.New(grpcCode, msg).Proto())
Expand Down Expand Up @@ -306,7 +309,7 @@ func calculateRetryAfter(retryAttemptHeader string, baseSeconds int, maxBackoffE
// toGRPCHTTPStatus converts the given error into an appropriate GRPC and HTTP status corresponding
// to that error, if the error is one of the errors from this package. Otherwise, codes.Internal and
// http.StatusInternalServerError is returned.
func toGRPCHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrides) (codes.Code, int) {
func toGRPCHTTPStatus(ctx context.Context, pushErr error, limits OTLPHandlerLimits) (codes.Code, int) {
if errors.Is(pushErr, context.DeadlineExceeded) {
return codes.Internal, http.StatusInternalServerError
}
Expand Down
213 changes: 213 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"bytes"
"compress/gzip"
"context"
"flag"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -27,8 +30,10 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/middleware"
dskit_server "github.com/grafana/dskit/server"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
Expand All @@ -38,7 +43,10 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/grafana/mimir/pkg/ingester/client"
Expand Down Expand Up @@ -1343,3 +1351,208 @@ func TestRetryConfig_Validate(t *testing.T) {
})
}
}

func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {
reg := prometheus.NewRegistry()
cfg := dskit_server.Config{}
// Set default values
cfg.RegisterFlags(flag.NewFlagSet("test", flag.ContinueOnError))

// Configure values for test.
cfg.HTTPListenAddress = "localhost"
cfg.HTTPListenPort = 0 // auto-assign
cfg.GRPCListenAddress = "localhost"
cfg.GRPCListenPort = 0 // auto-assign
cfg.Registerer = reg
cfg.Gatherer = reg
cfg.ReportHTTP4XXCodesInInstrumentationLabel = true // report 400 as errors.
cfg.GRPCMiddleware = []grpc.UnaryServerInterceptor{middleware.ServerUserHeaderInterceptor}
cfg.HTTPMiddleware = []middleware.Interface{middleware.AuthenticateUser}

srv, err := dskit_server.New(cfg)
require.NoError(t, err)

push := func(ctx context.Context, req *Request) error {
// Trigger conversion of incoming request to WriteRequest.
wr, err := req.WriteRequest()
if err != nil {
return err
}

if len(wr.Timeseries) > 0 && len(wr.Timeseries[0].Labels) > 0 && wr.Timeseries[0].Labels[0].Name == "__name__" && wr.Timeseries[0].Labels[0].Value == "report_server_error" {
return errors.New("some random push error")
}

return nil
}
h := OTLPHandler(200, util.NewBufferPool(), nil, false, otlpLimitsMock{}, RetryConfig{Enabled: false}, push, newPushMetrics(reg), reg, log.NewNopLogger(), true)
srv.HTTP.Handle("/otlp", h)

// start the server
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() { defer wg.Done(); _ = srv.Run() }()
t.Cleanup(func() {
srv.Stop()
wg.Wait()
})

// create client
conn, err := grpc.NewClient(srv.GRPCListenAddr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(middleware.ClientUserHeaderInterceptor))
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })

type testCase struct {
request *httpgrpc.HTTPRequest
expectedResponse *httpgrpc.HTTPResponse
expectedGrpcErrorMessage string
}

testcases := map[string]testCase{
"missing content type returns 415": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Url: "/otlp",
Body: []byte("hello"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 415,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, 415, "unsupported content type: , supported: [application/json, application/x-protobuf]"),
},
expectedGrpcErrorMessage: "rpc error: code = Code(415) desc = unsupported content type: , supported: [application/json, application/x-protobuf]",
},

"invalid JSON request returns 400": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
Body: []byte("invalid"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 400,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, 400, "ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|..."),
},
expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|...",
},

"empty JSON is good request, with 200 status code": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
Body: []byte("{}"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 200,
Headers: nil, // No headers expected for 200.
Body: nil, // No body expected for 200 code.
},
expectedGrpcErrorMessage: "", // No error expected
},

"trigger 5xx error by sending special metric": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
// This is simple OTLP request, with "report_server_error".
Body: []byte(`{"resourceMetrics": [{"scopeMetrics": [{"metrics": [{"name": "report_server_error", "gauge": {"dataPoints": [{"timeUnixNano": "1679912463340000000", "asDouble": 10.66}]}}]}]}]}`),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 500,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, codes.Internal, "some random push error"),
},
expectedGrpcErrorMessage: "rpc error: code = Code(500) desc = some random push error",
},
}

hc := httpgrpc.NewHTTPClient(conn)
httpClient := http.Client{}

for name, tc := range testcases {
t.Run(fmt.Sprintf("grpc: %s", name), func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")
resp, err := hc.Handle(ctx, tc.request)

if err != nil {
require.EqualError(t, err, tc.expectedGrpcErrorMessage)

errresp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "errors reported by OTLP handler should always be convertible to HTTP response")
resp = errresp
} else if tc.expectedGrpcErrorMessage != "" {
require.Failf(t, "expected error message %q, but got no error", tc.expectedGrpcErrorMessage)
}

// Before comparing response, we sort headers, to keep comparison stable.
sort.Slice(resp.Headers, func(i, j int) bool {
return resp.Headers[i].Key < resp.Headers[j].Key
})
require.Equal(t, tc.expectedResponse, resp)
})

t.Run(fmt.Sprintf("http: %s", name), func(t *testing.T) {
req, err := httpgrpc.ToHTTPRequest(context.Background(), tc.request)
require.NoError(t, err)

req.Header.Add("X-Scope-OrgID", "test")
req.RequestURI = ""
req.URL.Scheme = "http"
req.URL.Host = srv.HTTPListenAddr().String()

resp, err := httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
if len(body) == 0 {
body = nil // to simplify test
}

// Verify that body is the same as we expect through gRPC.
require.Equal(t, tc.expectedResponse.Body, body)

// Verify that expected headers are in the response.
for _, h := range tc.expectedResponse.Headers {
assert.Equal(t, h.Values, resp.Header.Values(h.Key))
}

// Verify that header that indicates grpc error for httpgrpc.Server is not in the response.
assert.Empty(t, resp.Header.Get(server.ErrorMessageHeaderKey))
})
}
}

func mustMarshalStatus(t *testing.T, code codes.Code, msg string) []byte {
bytes, err := proto.Marshal(grpcstatus.New(code, msg).Proto())
require.NoError(t, err)
return bytes
}

type otlpLimitsMock struct{}

func (o otlpLimitsMock) ServiceOverloadStatusCodeOnRateLimitEnabled(_ string) bool {
return false
}

func (o otlpLimitsMock) OTelMetricSuffixesEnabled(_ string) bool {
return false
}
10 changes: 8 additions & 2 deletions vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8af3685

Please sign in to comment.