Skip to content
This repository has been archived by the owner on Dec 22, 2022. It is now read-only.

Commit

Permalink
Loki: Improve logging and add metrics to streams dropped by stream li…
Browse files Browse the repository at this point in the history
…mit (grafana#2012)

* Improve the log message when we drop streams because a user is hitting a stream limit.
Increment the dropped samples metrics when this happens also.

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* improving comments

Signed-off-by: Ed Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean authored Apr 29, 2020
1 parent 5e34df8 commit 0ab1b28
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 99 deletions.
22 changes: 14 additions & 8 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"github.com/grafana/loki/pkg/util/validation"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -129,13 +130,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {

var appendErr error
for _, s := range req.Streams {
labels, err := util.ToClientLabels(s.Labels)
if err != nil {
appendErr = err
continue
}

stream, err := i.getOrCreateStream(labels)
stream, err := i.getOrCreateStream(s)
if err != nil {
appendErr = err
continue
Expand All @@ -153,7 +149,11 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, error) {
labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

Expand All @@ -162,8 +162,14 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}

err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}

Expand Down
13 changes: 4 additions & 9 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/util"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"

Expand Down Expand Up @@ -124,15 +122,12 @@ func TestSyncPeriod(t *testing.T) {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}

err = inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}})
require.NoError(t, err)

// let's verify results.
ls, err := util.ToClientLabels(lbls)
pr := &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}}
err = inst.Push(context.Background(), pr)
require.NoError(t, err)

s, err := inst.getOrCreateStream(ls)
// let's verify results
s, err := inst.getOrCreateStream(pr.Streams[0])
require.NoError(t, err)

// make sure each chunk spans max 'sync period' time
Expand Down
30 changes: 13 additions & 17 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded"
errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)"
)

// RingCount is the interface exposed by a ring implementation which allows
Expand Down Expand Up @@ -37,32 +37,28 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
actualLimit := l.maxStreamsPerUser(userID)
if streams < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalStreamsPerUser(userID)
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) maxStreamsPerUser(userID string) int {
// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)

// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)

// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)

// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}

if streams < calculatedLimit {
return nil
}

return localLimit
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
Expand Down
104 changes: 39 additions & 65 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,112 +11,86 @@ import (
"github.com/grafana/loki/pkg/util/validation"
)

func TestLimiter_maxStreamsPerUser(t *testing.T) {
func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
expected int
streams int
expected error
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
streams: 100,
expected: nil,
},
"current number of streams is below the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 299,
expected: nil,
},
"current number of streams is above the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 300,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 300, 300, 0, 1000, 300),
},
"both local and global limits are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
expected: math.MaxInt32,
streams: math.MaxInt32 - 1,
expected: nil,
},
"only local limit is enabled": {
maxLocalStreamsPerUser: 1000,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
expected: 1000,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 1000, 1000, 0, 0),
},
"only global limit is enabled with replication-factor=1": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 1,
ringIngesterCount: 10,
expected: 100,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 100, 0, 1000, 100),
},
"only global limit is enabled with replication-factor=3": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 300,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 0, 1000, 300),
},
"both local and global limits are set with local limit < global limit": {
maxLocalStreamsPerUser: 150,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 150,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 150, 150, 1000, 300),
},
"both local and global limits are set with local limit > global limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 300,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
// Mock the ring
ring := &ringCountMock{count: testData.ringIngesterCount}

// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor)
actual := limiter.maxStreamsPerUser("test")

assert.Equal(t, testData.expected, actual)
})
}
}

func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
streams int
expected error
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
streams: 100,
expected: nil,
},
"current number of streams is below the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 299,
expected: nil,
},
"current number of streams is above the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 300,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, 0, 1000, 300),
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (
RateLimited = "rate_limited"
// LineTooLong is a reason for discarding too long log lines.
LineTooLong = "line_too_long"
// StreamLimit is a reason for discarding lines when we can't create a new stream
// because the limit of active streams has been reached.
StreamLimit = "stream_limit"
)

// DiscardedBytes is a metric of the total discarded bytes, by reason.
Expand Down

0 comments on commit 0ab1b28

Please sign in to comment.