Skip to content

Commit

Permalink
fix(rw2.0): reject remote write 2.0 based on content type (#10423)
Browse files Browse the repository at this point in the history
* fix(rw2.0): reject remote write 2.0 based on content type

The current solution returns 2xx , but doesn't actually ingest the samples.

Prometheus does detect this
prometheus-1  | time=2025-01-13T13:01:35.028Z level=ERROR
source=queue_manager.go:1670 msg="non-recoverable error"
component=remote remote_name=150c10
 url=http://mimir-1:8001/api/v1/push failedSampleCount=2000
failedHistogramCount=0 failedExemplarCount=0
err="sent v2 request with 2000 samples, 0 histograms and 0 exemplars;
 got 2xx, but PRW 2.0 response header statistics indicate 0 samples,
 0 histograms and 0 exemplars were accepted; assumining failure e.g.
 the target only supports PRW 1.0 prometheus.WriteRequest, but does not
 check the Content-Type header correctly"

But we can do better and also start working towards RW2.0 support.

* update changelog
* Copy integration test from POC

From #10432

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama authored Jan 15, 2025
1 parent 9b67339 commit 1d7978e
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [BUGFIX] MQE: Fix deriv with histograms #10383
* [BUGFIX] PromQL: Fix <aggr_over_time> functions with histograms https://github.com/prometheus/prometheus/pull/15711 #10400
* [BUGFIX] MQE: Fix <aggr_over_time> functions with histograms #10400
* [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423

### Mixin

Expand Down
127 changes: 127 additions & 0 deletions integration/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/distributor/rw2"
)

func TestDistributor(t *testing.T) {
Expand Down Expand Up @@ -370,3 +372,128 @@ overrides:
})
}
}

func TestDistributorRemoteWrite2(t *testing.T) {
queryEnd := time.Now().Round(time.Second)
queryStart := queryEnd.Add(-1 * time.Hour)

testCases := map[string]struct {
inRemoteWrite []*promRW2.Request
runtimeConfig string
queries map[string]model.Matrix
exemplarQueries map[string][]promv1.ExemplarQueryResult
}{
"no special features": {
inRemoteWrite: []*promRW2.Request{
rw2.AddFloatSeries(
nil,
labels.FromStrings("__name__", "foobar"),
[]promRW2.Sample{{Timestamp: queryStart.UnixMilli(), Value: 100}},
promRW2.Metadata_METRIC_TYPE_COUNTER,
"some help",
"someunit",
0,
nil),
},
queries: map[string]model.Matrix{
"foobar": {{
Metric: model.Metric{"__name__": "foobar"},
Values: []model.SamplePair{{Timestamp: model.Time(queryStart.UnixMilli()), Value: model.SampleValue(100)}},
}},
},
},
}

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

previousRuntimeConfig := ""
require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(previousRuntimeConfig)))

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, blocksBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-distributor.ha-tracker.enable": "true",
"-distributor.ha-tracker.enable-for-all-users": "true",
"-distributor.ha-tracker.store": "consul",
"-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(),
"-distributor.ha-tracker.prefix": "prom_ha/",
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false),
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// We want only distributor to be reloading runtime config.
distributorFlags := mergeFlags(flags, map[string]string{
"-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, "runtime.yaml"),
"-runtime-config.reload-period": "100ms",
// Set non-zero default for number of exemplars. That way our values used in the test (0 and 100) will show up in runtime config diff.
"-ingester.max-global-exemplars-per-user": "3",
})

// Ingester will not reload runtime config.
ingesterFlags := mergeFlags(flags, map[string]string{
// Ingester will always see exemplars enabled. We do this to avoid waiting for ingester to apply new setting to TSDB.
"-ingester.max-global-exemplars-per-user": "100",
})

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), distributorFlags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ingesterFlags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

runtimeConfigURL := fmt.Sprintf("http://%s/runtime_config?mode=diff", distributor.HTTPEndpoint())

for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
for _, ser := range tc.inRemoteWrite {
if tc.runtimeConfig != previousRuntimeConfig {
currentRuntimeConfig, err := getURL(runtimeConfigURL)
require.NoError(t, err)

// Write new runtime config
require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(tc.runtimeConfig)))

// Wait until distributor has reloaded runtime config.
test.Poll(t, 1*time.Second, true, func() interface{} {
newRuntimeConfig, err := getURL(runtimeConfigURL)
require.NoError(t, err)
return currentRuntimeConfig != newRuntimeConfig
})

previousRuntimeConfig = tc.runtimeConfig
}

res, err := client.PushRW2(ser)
require.NoError(t, err)
require.Equal(t, http.StatusUnsupportedMediaType, res.StatusCode)
}

// Placeholder for actual query tests.
})
}
}
33 changes: 33 additions & 0 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb" // OTLP protos are not compatible with gogo
promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"

Expand Down Expand Up @@ -186,6 +187,38 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

func (c *Client) PushRW2(writeRequest *promRW2.Request) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(writeRequest)
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

// PushOTLP the input timeseries to the remote endpoint in OTLP format
func (c *Client) PushOTLP(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) (*http.Response, error) {
// Create write request
Expand Down
117 changes: 83 additions & 34 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"math/rand"
"net/http"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -149,49 +150,63 @@ func handler(
logger = utillog.WithSourceIPs(source, logger)
}
}
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest

userID, err := tenant.TenantID(ctx)
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
}

// userID might be empty if none was in the ctx, in this case just use the default setting.
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
// Optimization to avoid the allocations required for unmarshaling exemplars.
req.SkipUnmarshalingExemplars = true
var supplier supplierFunc
isRW2, err := isRemoteWrite2(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
if isRW2 {
supplier = func() (*mimirpb.WriteRequest, func(), error) {
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable
// unless the client switches to remote-write v1.
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported")
}
} else {
supplier = func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest

userID, err := tenant.TenantID(ctx)
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
}

if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
// userID might be empty if none was in the ctx, in this case just use the default setting.
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
// Optimization to avoid the allocations required for unmarshaling exemplars.
req.SkipUnmarshalingExemplars = true
}

rb.CleanUp()
return nil, nil, err
}
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}

if allowSkipLabelNameValidation {
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
} else {
req.SkipLabelValidation = false
}
rb.CleanUp()
return nil, nil, err
}

if allowSkipLabelCountValidation {
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
} else {
req.SkipLabelCountValidation = false
}
if allowSkipLabelNameValidation {
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
} else {
req.SkipLabelValidation = false
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
if allowSkipLabelCountValidation {
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
} else {
req.SkipLabelCountValidation = false
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
}
return &req.WriteRequest, cleanup, nil
}
req := newRequest(supplier)
if err := push(ctx, req); err != nil {
Expand Down Expand Up @@ -226,6 +241,40 @@ func handler(
})
}

func isRemoteWrite2(r *http.Request) (bool, error) {
const appProtoContentType = "application/x-protobuf"

contentType := r.Header.Get("Content-Type")
if contentType == "" {
// If the content type is not set, we assume it is remote write v1.
return false, nil
}
parts := strings.Split(contentType, ";")
if parts[0] != appProtoContentType {
return false, fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType)
}

// Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter
for _, p := range parts[1:] {
pair := strings.Split(p, "=")
if len(pair) != 2 {
return false, fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType)
}
if pair[0] == "proto" {
switch pair[1] {
case "prometheus.WriteRequest":
return false, nil
case "io.prometheus.write.v2.Request":
return true, nil
default:
return false, fmt.Errorf("got %v content type; expected prometheus.WriteRequest or io.prometheus.write.v2.Request", contentType)
}
}
}
// No "proto=" parameter, assuming v1.
return false, nil
}

func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string {
const jitterFactor = 0.5

Expand Down
Loading

0 comments on commit 1d7978e

Please sign in to comment.