From 1d7978e124e30ff028b5edfd2e3ee9d0feed3eed Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Wed, 15 Jan 2025 17:51:20 +0100 Subject: [PATCH] fix(rw2.0): reject remote write 2.0 based on content type (#10423) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(rw2.0): reject remote write 2.0 based on content type The current solution returns 2xx , but doesn't actually ingest the samples. Prometheus does detect this prometheus-1 | time=2025-01-13T13:01:35.028Z level=ERROR source=queue_manager.go:1670 msg="non-recoverable error" component=remote remote_name=150c10 url=http://mimir-1:8001/api/v1/push failedSampleCount=2000 failedHistogramCount=0 failedExemplarCount=0 err="sent v2 request with 2000 samples, 0 histograms and 0 exemplars; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted; assumining failure e.g. the target only supports PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly" But we can do better and also start working towards RW2.0 support. * update changelog * Copy integration test from POC From https://github.com/grafana/mimir/pull/10432 Signed-off-by: György Krajcsovits --- CHANGELOG.md | 1 + integration/distributor_test.go | 127 ++++++++++++++++++++++++++++++++ integration/e2emimir/client.go | 33 +++++++++ pkg/distributor/push.go | 117 ++++++++++++++++++++--------- pkg/distributor/rw2/utils.go | 104 ++++++++++++++++++++++++++ 5 files changed, 348 insertions(+), 34 deletions(-) create mode 100644 pkg/distributor/rw2/utils.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e77d642119..feaa09e6dcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ * [BUGFIX] MQE: Fix deriv with histograms #10383 * [BUGFIX] PromQL: Fix functions with histograms https://github.com/prometheus/prometheus/pull/15711 #10400 * [BUGFIX] MQE: Fix functions with histograms #10400 +* [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423 ### Mixin diff --git a/integration/distributor_test.go b/integration/distributor_test.go index 7b872762e68..4f2d73d3ebb 100644 --- a/integration/distributor_test.go +++ b/integration/distributor_test.go @@ -18,9 +18,11 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" + promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/stretchr/testify/require" "github.com/grafana/mimir/integration/e2emimir" + "github.com/grafana/mimir/pkg/distributor/rw2" ) func TestDistributor(t *testing.T) { @@ -370,3 +372,128 @@ overrides: }) } } + +func TestDistributorRemoteWrite2(t *testing.T) { + queryEnd := time.Now().Round(time.Second) + queryStart := queryEnd.Add(-1 * time.Hour) + + testCases := map[string]struct { + inRemoteWrite []*promRW2.Request + runtimeConfig string + queries map[string]model.Matrix + exemplarQueries map[string][]promv1.ExemplarQueryResult + }{ + "no special features": { + inRemoteWrite: []*promRW2.Request{ + rw2.AddFloatSeries( + nil, + labels.FromStrings("__name__", "foobar"), + []promRW2.Sample{{Timestamp: queryStart.UnixMilli(), Value: 100}}, + promRW2.Metadata_METRIC_TYPE_COUNTER, + "some help", + "someunit", + 0, + nil), + }, + queries: map[string]model.Matrix{ + "foobar": {{ + Metric: model.Metric{"__name__": "foobar"}, + Values: []model.SamplePair{{Timestamp: model.Time(queryStart.UnixMilli()), Value: model.SampleValue(100)}}, + }}, + }, + }, + } + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + previousRuntimeConfig := "" + require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(previousRuntimeConfig))) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, blocksBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + baseFlags := map[string]string{ + "-distributor.ingestion-tenant-shard-size": "0", + "-ingester.ring.heartbeat-period": "1s", + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.store": "consul", + "-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(), + "-distributor.ha-tracker.prefix": "prom_ha/", + "-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false), + } + + flags := mergeFlags( + BlocksStorageFlags(), + BlocksStorageS3Flags(), + baseFlags, + ) + + // We want only distributor to be reloading runtime config. + distributorFlags := mergeFlags(flags, map[string]string{ + "-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, "runtime.yaml"), + "-runtime-config.reload-period": "100ms", + // Set non-zero default for number of exemplars. That way our values used in the test (0 and 100) will show up in runtime config diff. + "-ingester.max-global-exemplars-per-user": "3", + }) + + // Ingester will not reload runtime config. + ingesterFlags := mergeFlags(flags, map[string]string{ + // Ingester will always see exemplars enabled. We do this to avoid waiting for ingester to apply new setting to TSDB. + "-ingester.max-global-exemplars-per-user": "100", + }) + + // Start Mimir components. + distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), distributorFlags) + ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ingesterFlags) + querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) + + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // Wait until querier has updated the ring. + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + runtimeConfigURL := fmt.Sprintf("http://%s/runtime_config?mode=diff", distributor.HTTPEndpoint()) + + for testName, tc := range testCases { + t.Run(testName, func(t *testing.T) { + for _, ser := range tc.inRemoteWrite { + if tc.runtimeConfig != previousRuntimeConfig { + currentRuntimeConfig, err := getURL(runtimeConfigURL) + require.NoError(t, err) + + // Write new runtime config + require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(tc.runtimeConfig))) + + // Wait until distributor has reloaded runtime config. + test.Poll(t, 1*time.Second, true, func() interface{} { + newRuntimeConfig, err := getURL(runtimeConfigURL) + require.NoError(t, err) + return currentRuntimeConfig != newRuntimeConfig + }) + + previousRuntimeConfig = tc.runtimeConfig + } + + res, err := client.PushRW2(ser) + require.NoError(t, err) + require.Equal(t, http.StatusUnsupportedMediaType, res.StatusCode) + } + + // Placeholder for actual query tests. + }) + } +} diff --git a/integration/e2emimir/client.go b/integration/e2emimir/client.go index 6055696967e..0460cdaf227 100644 --- a/integration/e2emimir/client.go +++ b/integration/e2emimir/client.go @@ -31,6 +31,7 @@ import ( promConfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/prompb" // OTLP protos are not compatible with gogo + promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage/remote" yaml "gopkg.in/yaml.v3" @@ -186,6 +187,38 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) { return res, nil } +func (c *Client) PushRW2(writeRequest *promRW2.Request) (*http.Response, error) { + // Create write request + data, err := proto.Marshal(writeRequest) + if err != nil { + return nil, err + } + + // Create HTTP request + compressed := snappy.Encode(nil, data) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/push", c.distributorAddress), bytes.NewReader(compressed)) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request") + req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + return res, nil +} + // PushOTLP the input timeseries to the remote endpoint in OTLP format func (c *Client) PushOTLP(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) (*http.Response, error) { // Create write request diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index 812250eb2a3..d19643cd611 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -13,6 +13,7 @@ import ( "math/rand" "net/http" "strconv" + "strings" "time" "github.com/go-kit/log" @@ -149,49 +150,63 @@ func handler( logger = utillog.WithSourceIPs(source, logger) } } - supplier := func() (*mimirpb.WriteRequest, func(), error) { - rb := util.NewRequestBuffers(requestBufferPool) - var req mimirpb.PreallocWriteRequest - userID, err := tenant.TenantID(ctx) - if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID - return nil, nil, errors.Wrap(err, "failed to get tenant ID") - } - - // userID might be empty if none was in the ctx, in this case just use the default setting. - if limits.MaxGlobalExemplarsPerUser(userID) == 0 { - // The user is not allowed to send exemplars, so there is no need to unmarshal them. - // Optimization to avoid the allocations required for unmarshaling exemplars. - req.SkipUnmarshalingExemplars = true + var supplier supplierFunc + isRW2, err := isRemoteWrite2(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + if isRW2 { + supplier = func() (*mimirpb.WriteRequest, func(), error) { + // Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable + // unless the client switches to remote-write v1. + return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported") } + } else { + supplier = func() (*mimirpb.WriteRequest, func(), error) { + rb := util.NewRequestBuffers(requestBufferPool) + var req mimirpb.PreallocWriteRequest + + userID, err := tenant.TenantID(ctx) + if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID + return nil, nil, errors.Wrap(err, "failed to get tenant ID") + } - if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil { - // Check for httpgrpc error, default to client error if parsing failed - if _, ok := httpgrpc.HTTPResponseFromError(err); !ok { - err = httpgrpc.Error(http.StatusBadRequest, err.Error()) + // userID might be empty if none was in the ctx, in this case just use the default setting. + if limits.MaxGlobalExemplarsPerUser(userID) == 0 { + // The user is not allowed to send exemplars, so there is no need to unmarshal them. + // Optimization to avoid the allocations required for unmarshaling exemplars. + req.SkipUnmarshalingExemplars = true } - rb.CleanUp() - return nil, nil, err - } + if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil { + // Check for httpgrpc error, default to client error if parsing failed + if _, ok := httpgrpc.HTTPResponseFromError(err); !ok { + err = httpgrpc.Error(http.StatusBadRequest, err.Error()) + } - if allowSkipLabelNameValidation { - req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true" - } else { - req.SkipLabelValidation = false - } + rb.CleanUp() + return nil, nil, err + } - if allowSkipLabelCountValidation { - req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true" - } else { - req.SkipLabelCountValidation = false - } + if allowSkipLabelNameValidation { + req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true" + } else { + req.SkipLabelValidation = false + } - cleanup := func() { - mimirpb.ReuseSlice(req.Timeseries) - rb.CleanUp() + if allowSkipLabelCountValidation { + req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true" + } else { + req.SkipLabelCountValidation = false + } + + cleanup := func() { + mimirpb.ReuseSlice(req.Timeseries) + rb.CleanUp() + } + return &req.WriteRequest, cleanup, nil } - return &req.WriteRequest, cleanup, nil } req := newRequest(supplier) if err := push(ctx, req); err != nil { @@ -226,6 +241,40 @@ func handler( }) } +func isRemoteWrite2(r *http.Request) (bool, error) { + const appProtoContentType = "application/x-protobuf" + + contentType := r.Header.Get("Content-Type") + if contentType == "" { + // If the content type is not set, we assume it is remote write v1. + return false, nil + } + parts := strings.Split(contentType, ";") + if parts[0] != appProtoContentType { + return false, fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType) + } + + // Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter + for _, p := range parts[1:] { + pair := strings.Split(p, "=") + if len(pair) != 2 { + return false, fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType) + } + if pair[0] == "proto" { + switch pair[1] { + case "prometheus.WriteRequest": + return false, nil + case "io.prometheus.write.v2.Request": + return true, nil + default: + return false, fmt.Errorf("got %v content type; expected prometheus.WriteRequest or io.prometheus.write.v2.Request", contentType) + } + } + } + // No "proto=" parameter, assuming v1. + return false, nil +} + func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string { const jitterFactor = 0.5 diff --git a/pkg/distributor/rw2/utils.go b/pkg/distributor/rw2/utils.go new file mode 100644 index 00000000000..90d49cafc87 --- /dev/null +++ b/pkg/distributor/rw2/utils.go @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package rw2 + +import ( + "fmt" + + "github.com/prometheus/prometheus/model/labels" + promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" +) + +// NewWriteRequest is used in testing Remote Write 2.0 to generate a new write request. +func NewWriteRequest() *promRW2.Request { + return &promRW2.Request{} +} + +// AddFloatSeries is used in testing Remote Write 2.0 to add a float series to a write request. +// Write 0 into createdTimestamp to not use it. +func AddFloatSeries( + req *promRW2.Request, + lbls labels.Labels, + floats []promRW2.Sample, + metricType promRW2.Metadata_MetricType, + help string, + unit string, + createdTimestamp int64, + exemplars []promRW2.Exemplar) *promRW2.Request { + if req == nil { + req = NewWriteRequest() + } + + var labelsRefs []uint32 + lbls.Range(func(l labels.Label) { + labelsRefs = append(labelsRefs, getSymbol(l.Name, &req.Symbols)) + labelsRefs = append(labelsRefs, getSymbol(l.Value, &req.Symbols)) + }) + + ts := promRW2.TimeSeries{ + LabelsRefs: labelsRefs, + Samples: floats, + Metadata: promRW2.Metadata{ + Type: metricType, + HelpRef: getSymbol(help, &req.Symbols), + UnitRef: getSymbol(unit, &req.Symbols), + }, + Exemplars: exemplars, + CreatedTimestamp: createdTimestamp, + } + fmt.Printf("KRAJO: AddFloatSeries: %v\n", ts.Metadata) + req.Timeseries = append(req.Timeseries, ts) + + return req +} + +// AddHistogramSeries is used in testing Remote Write 2.0 to add a histogram series to a write request. +// Write 0 into createdTimestamp to not use it. +func AddHistogramSeries( + req *promRW2.Request, + lbls labels.Labels, + histograms []promRW2.Histogram, + help string, + unit string, + createdTimestamp int64, + exemplars []promRW2.Exemplar) *promRW2.Request { + if req == nil { + req = NewWriteRequest() + } + + var labelsRefs []uint32 + lbls.Range(func(l labels.Label) { + labelsRefs = append(labelsRefs, getSymbol(l.Name, &req.Symbols)) + labelsRefs = append(labelsRefs, getSymbol(l.Value, &req.Symbols)) + }) + + metricType := promRW2.Metadata_METRIC_TYPE_HISTOGRAM + if histograms[0].ResetHint == promRW2.Histogram_RESET_HINT_GAUGE { + metricType = promRW2.Metadata_METRIC_TYPE_GAUGEHISTOGRAM + } + + ts := promRW2.TimeSeries{ + LabelsRefs: labelsRefs, + Histograms: histograms, + Metadata: promRW2.Metadata{ + Type: metricType, + HelpRef: getSymbol(help, &req.Symbols), + UnitRef: getSymbol(unit, &req.Symbols), + }, + Exemplars: exemplars, + CreatedTimestamp: createdTimestamp, + } + req.Timeseries = append(req.Timeseries, ts) + + return req +} + +func getSymbol(sym string, symbols *[]string) uint32 { + for i, s := range *symbols { + if s == sym { + return uint32(i) + } + } + *symbols = append(*symbols, sym) + return uint32(len(*symbols) - 1) +}