diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eb88bc418713c..7d916954efbed 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -197,6 +197,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) if err != nil { validationErr = err + validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, userID).Add(float64(len(stream.Entries))) + bytes := 0 + for _, e := range stream.Entries { + bytes += len(e.Line) + } + validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes)) continue } n := 0 @@ -230,7 +236,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // Return a 429 to indicate to the client they are being rate limited validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount)) validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize) } const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck @@ -345,7 +351,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, } ls, err := logql.ParseLabels(key) if err != nil { - return "", httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err) + return "", httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err) } // ensure labels are correctly sorted. if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 5903c701e0571..5b4ac06dd5247 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -59,19 +59,19 @@ func TestDistributor(t *testing.T) { }, { lines: 100, - expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(100, 100, 1000)), + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 100, 100, 1000), }, { lines: 100, maxLineSize: 1, expectedResponse: success, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(1, 10, "{foo=\"bar\"}")), + expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 1, "{foo=\"bar\"}", 10), }, { lines: 100, mangleLabels: true, expectedResponse: success, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: 1:4: parse error: unterminated quoted string"), + expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, "{ab\"", "1:4: parse error: unterminated quoted string"), }, } { t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) { @@ -180,9 +180,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)), pushes: []testPush{ {bytes: 5, expectedError: nil}, - {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 6))}, + {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 10, 1, 6)}, {bytes: 5, expectedError: nil}, - {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 1))}, + {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 10, 1, 1)}, }, }, "global strategy: limit should be evenly shared across distributors": { @@ -192,9 +192,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)), pushes: []testPush{ {bytes: 3, expectedError: nil}, - {bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 3))}, + {bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 3)}, {bytes: 2, expectedError: nil}, - {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))}, + {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 1)}, }, }, "global strategy: burst should set to each distributor": { @@ -204,9 +204,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)), pushes: []testPush{ {bytes: 15, expectedError: nil}, - {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 6))}, + {bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 6)}, {bytes: 5, expectedError: nil}, - {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))}, + {bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 1)}, }, }, } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index a680b85acd023..1bc73d02c68d4 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -57,13 +57,13 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge { validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, entry.Timestamp) } if ts > ctx.creationGracePeriod { validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, entry.Timestamp) } if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize { @@ -73,7 +73,7 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log // for parity. validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) } return nil @@ -89,20 +89,20 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea bytes += len(e.Line) } validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Add(float64(bytes)) - return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) } lastLabelName := "" for _, l := range ls { if len(l.Name) > ctx.maxLabelNameLength { updateMetrics(validation.LabelNameTooLong, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name) } else if len(l.Value) > ctx.maxLabelValueLength { updateMetrics(validation.LabelValueTooLong, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value) } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name)) + return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name) } lastLabelName = l.Name } diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index c1d69398f3a46..b97c13415c995 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -43,14 +43,14 @@ func TestValidator_ValidateEntry(t *testing.T) { } }, logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(testStreamLabels, testTime.Add(-time.Hour*5))), + httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5)), }, { "test too new", "test", nil, logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(testStreamLabels, testTime.Add(time.Hour*5))), + httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5)), }, { "line too long", @@ -61,7 +61,7 @@ func TestValidator_ValidateEntry(t *testing.T) { } }, logproto.Entry{Timestamp: testTime, Line: "12345678901"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(10, 11, testStreamLabels)), + httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 10, testStreamLabels, 11), }, } for _, tt := range tests { @@ -101,7 +101,7 @@ func TestValidator_ValidateLabels(t *testing.T) { return &validation.Limits{MaxLabelNamesPerSeries: 2} }, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2)), + httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2), }, { "label name too long", @@ -113,7 +113,7 @@ func TestValidator_ValidateLabels(t *testing.T) { } }, "{fooooo=\"bar\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg("{fooooo=\"bar\"}", "fooooo")), + httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"), }, { "label value too long", @@ -126,7 +126,7 @@ func TestValidator_ValidateLabels(t *testing.T) { } }, "{foo=\"barrrrrr\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg("{foo=\"barrrrrr\"}", "barrrrrr")), + httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"), }, { "duplicate label", @@ -139,7 +139,23 @@ func TestValidator_ValidateLabels(t *testing.T) { } }, "{foo=\"bar\", foo=\"barf\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg("{foo=\"bar\", foo=\"barf\"}", "foo")), + httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"), + }, + { + "label value contains %", + "test", + func(userID string) *validation.Limits { + return &validation.Limits{ + MaxLabelNamesPerSeries: 2, + MaxLabelNameLength: 5, + MaxLabelValueLength: 5, + } + }, + "{foo=\"bar\", foo=\"barf%s\"}", + httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ + Code: int32(http.StatusBadRequest), + Body: []byte("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING) + }), }, } for _, tt := range tests { diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 6d1b0c7bd7552..5973a350bcd34 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -216,7 +216,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r bytes += len(e.Line) } validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg()) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg) } labels, err := logql.ParseLabels(pushReqStream.Labels) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 035adc4deeff3..6441fe036e842 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -1,44 +1,43 @@ package validation import ( - "fmt" - "time" - "github.com/prometheus/client_golang/prometheus" ) const ( discardReasonLabel = "reason" - + // InvalidLabels is a reason for discarding log lines which have labels that cannot be parsed. + InvalidLabels = "invalid_labels" + InvalidLabelsErrorMsg = "Error parsing labels '%s' with error: %s" // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. - RateLimited = "rate_limited" - rateLimitErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased" + RateLimited = "rate_limited" + RateLimitedErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased" // LineTooLong is a reason for discarding too long log lines. LineTooLong = "line_too_long" - lineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes" + LineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes" // StreamLimit is a reason for discarding lines when we can't create a new stream // because the limit of active streams has been reached. StreamLimit = "stream_limit" - streamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased" + StreamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased" // GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age` GreaterThanMaxSampleAge = "greater_than_max_sample_age" - greaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v" + GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v" // TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period` TooFarInFuture = "too_far_in_future" - tooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v" + TooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v" // MaxLabelNamesPerSeries is a reason for discarding a log line which has too many label names MaxLabelNamesPerSeries = "max_label_names_per_series" - maxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d" + MaxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d" // LabelNameTooLong is a reason for discarding a log line which has a label name too long LabelNameTooLong = "label_name_too_long" - labelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'" + LabelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'" // LabelValueTooLong is a reason for discarding a log line which has a lable value too long LabelValueTooLong = "label_value_too_long" - labelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'" + LabelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'" // DuplicateLabelNames is a reason for discarding a log line which has duplicate label names DuplicateLabelNames = "duplicate_label_names" - duplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'" + DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'" ) // DiscardedBytes is a metric of the total discarded bytes, by reason. @@ -64,48 +63,3 @@ var DiscardedSamples = prometheus.NewCounterVec( func init() { prometheus.MustRegister(DiscardedSamples, DiscardedBytes) } - -// RateLimitedErrorMsg returns an error string for rate limited requests -func RateLimitedErrorMsg(limit, lines, bytes int) string { - return fmt.Sprintf(rateLimitErrorMsg, limit, lines, bytes) -} - -// LineTooLongErrorMsg returns an error string for a line which is too long -func LineTooLongErrorMsg(maxLength, entryLength int, stream string) string { - return fmt.Sprintf(lineTooLongErrorMsg, maxLength, stream, entryLength) -} - -// StreamLimitErrorMsg returns an error string for requests refused for exceeding active stream limits -func StreamLimitErrorMsg() string { - return fmt.Sprint(streamLimitErrorMsg) -} - -// GreaterThanMaxSampleAgeErrorMsg returns an error string for a line with a timestamp too old -func GreaterThanMaxSampleAgeErrorMsg(stream string, timestamp time.Time) string { - return fmt.Sprintf(greaterThanMaxSampleAgeErrorMsg, stream, timestamp) -} - -// TooFarInFutureErrorMsg returns an error string for a line with a timestamp too far in the future -func TooFarInFutureErrorMsg(stream string, timestamp time.Time) string { - return fmt.Sprintf(tooFarInFutureErrorMsg, stream, timestamp) -} - -// MaxLabelNamesPerSeriesErrorMsg returns an error string for a stream with too many labels -func MaxLabelNamesPerSeriesErrorMsg(stream string, labelCount, labelLimit int) string { - return fmt.Sprintf(maxLabelNamesPerSeriesErrorMsg, stream, labelCount, labelLimit) -} - -// LabelNameTooLongErrorMsg returns an error string for a stream with a label name too long -func LabelNameTooLongErrorMsg(stream, label string) string { - return fmt.Sprintf(labelNameTooLongErrorMsg, stream, label) -} - -// LabelValueTooLongErrorMsg returns an error string for a stream with a label value too long -func LabelValueTooLongErrorMsg(stream, labelValue string) string { - return fmt.Sprintf(labelValueTooLongErrorMsg, stream, labelValue) -} - -// DuplicateLabelNamesErrorMsg returns an error string for a stream which has duplicate labels -func DuplicateLabelNamesErrorMsg(stream, label string) string { - return fmt.Sprintf(duplicateLabelNamesErrorMsg, stream, label) -}