diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7d05eb97608c2..afcbc5d72526f 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "github.com/grafana/loki/pkg/util/validation" "net/http" "sync" "time" @@ -129,13 +130,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { var appendErr error for _, s := range req.Streams { - labels, err := util.ToClientLabels(s.Labels) - if err != nil { - appendErr = err - continue - } - stream, err := i.getOrCreateStream(labels) + stream, err := i.getOrCreateStream(s) if err != nil { appendErr = err continue @@ -153,7 +149,11 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { return appendErr } -func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) { +func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, error) { + labels, err := util.ToClientLabels(pushReqStream.Labels) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } rawFp := client.FastFingerprint(labels) fp := i.mapper.mapFP(rawFp, labels) @@ -162,8 +162,14 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err return stream, nil } - err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) + err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) if err != nil { + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) + bytes := 0 + for _, e := range pushReqStream.Entries { + bytes += len(e.Line) + } + validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 50bfed5473882..c425672e207f2 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -10,8 +10,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" - "github.com/grafana/loki/pkg/util" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" @@ -124,15 +122,12 @@ func TestSyncPeriod(t *testing.T) { result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)}) tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds()))) } - - err = inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}}) - require.NoError(t, err) - - // let's verify results. - ls, err := util.ToClientLabels(lbls) + pr := &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}} + err = inst.Push(context.Background(), pr) require.NoError(t, err) - s, err := inst.getOrCreateStream(ls) + // let's verify results + s, err := inst.getOrCreateStream(pr.Streams[0]) require.NoError(t, err) // make sure each chunk spans max 'sync period' time diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 5f1b52002754e..382c1c0be70bb 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -8,7 +8,7 @@ import ( ) const ( - errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded" + errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)" ) // RingCount is the interface exposed by a ring implementation which allows @@ -37,32 +37,28 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor // AssertMaxStreamsPerUser ensures limit has not been reached compared to the current // number of streams in input and returns an error if so. func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { - actualLimit := l.maxStreamsPerUser(userID) - if streams < actualLimit { - return nil - } - - localLimit := l.limits.MaxLocalStreamsPerUser(userID) - globalLimit := l.limits.MaxGlobalStreamsPerUser(userID) - - return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit) -} - -func (l *Limiter) maxStreamsPerUser(userID string) int { + // Start by setting the local limit either from override or default localLimit := l.limits.MaxLocalStreamsPerUser(userID) // We can assume that streams are evenly distributed across ingesters // so we do convert the global limit into a local limit globalLimit := l.limits.MaxGlobalStreamsPerUser(userID) - localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit)) + adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit) + + // Set the calculated limit to the lesser of the local limit or the new calculated global limit + calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit) // If both the local and global limits are disabled, we just // use the largest int value - if localLimit == 0 { - localLimit = math.MaxInt32 + if calculatedLimit == 0 { + calculatedLimit = math.MaxInt32 + } + + if streams < calculatedLimit { + return nil } - return localLimit + return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit) } func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index c01a06862824d..e43e65d74b205 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -11,112 +11,86 @@ import ( "github.com/grafana/loki/pkg/util/validation" ) -func TestLimiter_maxStreamsPerUser(t *testing.T) { +func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { tests := map[string]struct { maxLocalStreamsPerUser int maxGlobalStreamsPerUser int ringReplicationFactor int ringIngesterCount int - expected int + streams int + expected error }{ + "both local and global limit are disabled": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 0, + ringReplicationFactor: 1, + ringIngesterCount: 1, + streams: 100, + expected: nil, + }, + "current number of streams is below the limit": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + streams: 299, + expected: nil, + }, + "current number of streams is above the limit": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + streams: 300, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 300, 300, 0, 1000, 300), + }, "both local and global limits are disabled": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 0, ringReplicationFactor: 1, ringIngesterCount: 1, - expected: math.MaxInt32, + streams: math.MaxInt32 - 1, + expected: nil, }, "only local limit is enabled": { maxLocalStreamsPerUser: 1000, maxGlobalStreamsPerUser: 0, ringReplicationFactor: 1, ringIngesterCount: 1, - expected: 1000, + streams: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 1000, 1000, 0, 0), }, "only global limit is enabled with replication-factor=1": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 1000, ringReplicationFactor: 1, ringIngesterCount: 10, - expected: 100, + streams: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 100, 0, 1000, 100), }, "only global limit is enabled with replication-factor=3": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 1000, ringReplicationFactor: 3, ringIngesterCount: 10, - expected: 300, + streams: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 0, 1000, 300), }, "both local and global limits are set with local limit < global limit": { maxLocalStreamsPerUser: 150, maxGlobalStreamsPerUser: 1000, ringReplicationFactor: 3, ringIngesterCount: 10, - expected: 150, + streams: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 150, 150, 1000, 300), }, "both local and global limits are set with local limit > global limit": { maxLocalStreamsPerUser: 500, maxGlobalStreamsPerUser: 1000, ringReplicationFactor: 3, ringIngesterCount: 10, - expected: 300, - }, - } - - for testName, testData := range tests { - testData := testData - - t.Run(testName, func(t *testing.T) { - // Mock the ring - ring := &ringCountMock{count: testData.ringIngesterCount} - - // Mock limits - limits, err := validation.NewOverrides(validation.Limits{ - MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser, - MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser, - }, nil) - require.NoError(t, err) - - limiter := NewLimiter(limits, ring, testData.ringReplicationFactor) - actual := limiter.maxStreamsPerUser("test") - - assert.Equal(t, testData.expected, actual) - }) - } -} - -func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { - tests := map[string]struct { - maxLocalStreamsPerUser int - maxGlobalStreamsPerUser int - ringReplicationFactor int - ringIngesterCount int - streams int - expected error - }{ - "both local and global limit are disabled": { - maxLocalStreamsPerUser: 0, - maxGlobalStreamsPerUser: 0, - ringReplicationFactor: 1, - ringIngesterCount: 1, - streams: 100, - expected: nil, - }, - "current number of streams is below the limit": { - maxLocalStreamsPerUser: 0, - maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, - streams: 299, - expected: nil, - }, - "current number of streams is above the limit": { - maxLocalStreamsPerUser: 0, - maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, - streams: 300, - expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, 0, 1000, 300), + streams: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300), }, } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 9293a989c17fb..97f54caa15e20 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -10,6 +10,9 @@ const ( RateLimited = "rate_limited" // LineTooLong is a reason for discarding too long log lines. LineTooLong = "line_too_long" + // StreamLimit is a reason for discarding lines when we can't create a new stream + // because the limit of active streams has been reached. + StreamLimit = "stream_limit" ) // DiscardedBytes is a metric of the total discarded bytes, by reason.