Skip to content

Commit

Permalink
Loki: fix validation error and metrics (#3307)
Browse files Browse the repository at this point in the history
* Remove the helper functions for creating errors as they cause unintended value manipulation if the value contains any valid golang printf substitute characters, e.g. %s

* Improve error message for invalid labels, increment metrics for lines dropped for invalid labels.
  • Loading branch information
slim-bean authored Feb 11, 2021
1 parent 1eac0bf commit 407afce
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 85 deletions.
10 changes: 8 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, userID).Add(float64(len(stream.Entries)))
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes))
continue
}
n := 0
Expand Down Expand Up @@ -230,7 +236,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
Expand Down Expand Up @@ -345,7 +351,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
}
ls, err := logql.ParseLabels(key)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
return "", httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ func TestDistributor(t *testing.T) {
},
{
lines: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(100, 100, 1000)),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 100, 100, 1000),
},
{
lines: 100,
maxLineSize: 1,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(1, 10, "{foo=\"bar\"}")),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 1, "{foo=\"bar\"}", 10),
},
{
lines: 100,
mangleLabels: true,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: 1:4: parse error: unterminated quoted string"),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, "{ab\"", "1:4: parse error: unterminated quoted string"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) {
Expand Down Expand Up @@ -180,9 +180,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 5, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 6))},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 10, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 1))},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 10, 1, 1)},
},
},
"global strategy: limit should be evenly shared across distributors": {
Expand All @@ -192,9 +192,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 3, expectedError: nil},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 3))},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 3)},
{bytes: 2, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 1)},
},
},
"global strategy: burst should set to each distributor": {
Expand All @@ -204,9 +204,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 15, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 6))},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 1)},
},
},
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, entry.Timestamp)
}

if ts > ctx.creationGracePeriod {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, entry.Timestamp)
}

if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize {
Expand All @@ -73,7 +73,7 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

return nil
Expand All @@ -89,20 +89,20 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
}

lastLabelName := ""
for _, l := range ls {
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
}
lastLabelName = l.Name
}
Expand Down
30 changes: 23 additions & 7 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func TestValidator_ValidateEntry(t *testing.T) {
}
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(testStreamLabels, testTime.Add(-time.Hour*5))),
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5)),
},
{
"test too new",
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(testStreamLabels, testTime.Add(time.Hour*5))),
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5)),
},
{
"line too long",
Expand All @@ -61,7 +61,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
}
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(10, 11, testStreamLabels)),
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
return &validation.Limits{MaxLabelNamesPerSeries: 2}
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2)),
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2),
},
{
"label name too long",
Expand All @@ -113,7 +113,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
},
"{fooooo=\"bar\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg("{fooooo=\"bar\"}", "fooooo")),
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"),
},
{
"label value too long",
Expand All @@ -126,7 +126,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
},
"{foo=\"barrrrrr\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg("{foo=\"barrrrrr\"}", "barrrrrr")),
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"),
},
{
"duplicate label",
Expand All @@ -139,7 +139,23 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
},
"{foo=\"bar\", foo=\"barf\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg("{foo=\"bar\", foo=\"barf\"}", "foo")),
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"),
},
{
"label value contains %",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
"{foo=\"bar\", foo=\"barf%s\"}",
httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: int32(http.StatusBadRequest),
Body: []byte("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING)
}),
},
}
for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg)
}

labels, err := logql.ParseLabels(pushReqStream.Labels)
Expand Down
72 changes: 13 additions & 59 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
package validation

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
discardReasonLabel = "reason"

// InvalidLabels is a reason for discarding log lines which have labels that cannot be parsed.
InvalidLabels = "invalid_labels"
InvalidLabelsErrorMsg = "Error parsing labels '%s' with error: %s"
// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"
rateLimitErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased"
RateLimited = "rate_limited"
RateLimitedErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased"
// LineTooLong is a reason for discarding too long log lines.
LineTooLong = "line_too_long"
lineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes"
LineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes"
// StreamLimit is a reason for discarding lines when we can't create a new stream
// because the limit of active streams has been reached.
StreamLimit = "stream_limit"
streamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased"
StreamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased"
// GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age`
GreaterThanMaxSampleAge = "greater_than_max_sample_age"
greaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"
GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"
// TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period`
TooFarInFuture = "too_far_in_future"
tooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v"
TooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v"
// MaxLabelNamesPerSeries is a reason for discarding a log line which has too many label names
MaxLabelNamesPerSeries = "max_label_names_per_series"
maxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d"
MaxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d"
// LabelNameTooLong is a reason for discarding a log line which has a label name too long
LabelNameTooLong = "label_name_too_long"
labelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'"
LabelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'"
// LabelValueTooLong is a reason for discarding a log line which has a lable value too long
LabelValueTooLong = "label_value_too_long"
labelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'"
LabelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'"
// DuplicateLabelNames is a reason for discarding a log line which has duplicate label names
DuplicateLabelNames = "duplicate_label_names"
duplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
)

// DiscardedBytes is a metric of the total discarded bytes, by reason.
Expand All @@ -64,48 +63,3 @@ var DiscardedSamples = prometheus.NewCounterVec(
func init() {
prometheus.MustRegister(DiscardedSamples, DiscardedBytes)
}

// RateLimitedErrorMsg returns an error string for rate limited requests
func RateLimitedErrorMsg(limit, lines, bytes int) string {
return fmt.Sprintf(rateLimitErrorMsg, limit, lines, bytes)
}

// LineTooLongErrorMsg returns an error string for a line which is too long
func LineTooLongErrorMsg(maxLength, entryLength int, stream string) string {
return fmt.Sprintf(lineTooLongErrorMsg, maxLength, stream, entryLength)
}

// StreamLimitErrorMsg returns an error string for requests refused for exceeding active stream limits
func StreamLimitErrorMsg() string {
return fmt.Sprint(streamLimitErrorMsg)
}

// GreaterThanMaxSampleAgeErrorMsg returns an error string for a line with a timestamp too old
func GreaterThanMaxSampleAgeErrorMsg(stream string, timestamp time.Time) string {
return fmt.Sprintf(greaterThanMaxSampleAgeErrorMsg, stream, timestamp)
}

// TooFarInFutureErrorMsg returns an error string for a line with a timestamp too far in the future
func TooFarInFutureErrorMsg(stream string, timestamp time.Time) string {
return fmt.Sprintf(tooFarInFutureErrorMsg, stream, timestamp)
}

// MaxLabelNamesPerSeriesErrorMsg returns an error string for a stream with too many labels
func MaxLabelNamesPerSeriesErrorMsg(stream string, labelCount, labelLimit int) string {
return fmt.Sprintf(maxLabelNamesPerSeriesErrorMsg, stream, labelCount, labelLimit)
}

// LabelNameTooLongErrorMsg returns an error string for a stream with a label name too long
func LabelNameTooLongErrorMsg(stream, label string) string {
return fmt.Sprintf(labelNameTooLongErrorMsg, stream, label)
}

// LabelValueTooLongErrorMsg returns an error string for a stream with a label value too long
func LabelValueTooLongErrorMsg(stream, labelValue string) string {
return fmt.Sprintf(labelValueTooLongErrorMsg, stream, labelValue)
}

// DuplicateLabelNamesErrorMsg returns an error string for a stream which has duplicate labels
func DuplicateLabelNamesErrorMsg(stream, label string) string {
return fmt.Sprintf(duplicateLabelNamesErrorMsg, stream, label)
}

0 comments on commit 407afce

Please sign in to comment.