Skip to content

Commit

Permalink
Added max streams per user global limit (#1493)
Browse files Browse the repository at this point in the history
* Added max streams per user global limit

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated changelog

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
pracucci and cyriltovena committed Jan 9, 2020
1 parent ce407d3 commit ec40515
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Features

* [1493](https://github.com/grafana/loki/pull/1493) **pracucci**: pkg/ingester: added a per-cluster limit on the maximum number of series per-user, configured via the `max_global_streams_per_user` config option.
* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Added `global` ingestion rate limiter strategy support.

Expand Down
8 changes: 7 additions & 1 deletion docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -762,9 +762,15 @@ logs in Loki.
# Enforce every sample has a metric name.
[enforce_metric_name: <boolean> | default = true]
# Maximum number of active streams per user.
# Maximum number of active streams per user, per ingester. 0 to disable.
[max_streams_per_user: <int> | default = 10000]
# Maximum number of active streams per user, across the cluster. 0 to disable.
# When the global limit is enabled, each ingester is configured with a dynamic
# local limit based on the replication factor and the current number of healthy
# ingesters, and is kept updated whenever the number of ingesters change.
[max_global_streams_per_user: <int> | default = 0]
# Maximum number of chunks that can be fetched by a single query.
[max_chunks_per_query: <int> | default = 2000000]
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type Ingester struct {
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup

limits *validation.Overrides
limiter *Limiter
factory func() chunkenc.Chunk
}

Expand All @@ -126,7 +126,6 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
limits: limits,
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
Expand All @@ -145,6 +144,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid

i.lifecycler.Start()

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)

i.done.Add(1)
go i.loop()

Expand Down Expand Up @@ -208,7 +211,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.factory, i.limits, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
inst = newInstance(instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
i.instances[instanceID] = inst
}
return inst
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestIngester(t *testing.T) {
func TestIngesterStreamLimitExceeded(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
defaultLimits.MaxStreamsPerUser = 1
defaultLimits.MaxLocalStreamsPerUser = 1
overrides, err := validation.NewOverrides(defaultLimits)

require.NoError(t, err)
Expand Down
13 changes: 7 additions & 6 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

const queryBatchSize = 128
Expand Down Expand Up @@ -66,15 +65,15 @@ type instance struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex

limits *validation.Overrides
limiter *Limiter
factory func() chunkenc.Chunk

// sync
syncPeriod time.Duration
syncMinUtil float64
}

func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *validation.Overrides, syncPeriod time.Duration, syncMinUtil float64) *instance {
func newInstance(instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
Expand All @@ -85,7 +84,7 @@ func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *valid

factory: factory,
tailers: map[uint32]*tailer{},
limits: limits,
limiter: limiter,

syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
Expand Down Expand Up @@ -160,9 +159,11 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}

if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}

sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.factory)
i.streams[fp] = stream
Expand Down
15 changes: 9 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ var defaultFactory = func() chunkenc.Chunk {
}

func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance("test", defaultFactory, o, 0, 0)
i := newInstance("test", defaultFactory, limiter, 0, 0)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -50,10 +51,11 @@ func TestLabelsCollisions(t *testing.T) {
}

func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

inst := newInstance("test", defaultFactory, o, 0, 0)
inst := newInstance("test", defaultFactory, limiter, 0, 0)

const (
concurrent = 10
Expand Down Expand Up @@ -100,8 +102,9 @@ func TestConcurrentPushes(t *testing.T) {
}

func TestSyncPeriod(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

const (
syncPeriod = 1 * time.Minute
Expand All @@ -110,7 +113,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst := newInstance("test", defaultFactory, o, syncPeriod, minUtil)
inst := newInstance("test", defaultFactory, limiter, syncPeriod, minUtil)
lbls := makeRandomLabels()

tt := time.Now()
Expand Down
94 changes: 94 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package ingester

import (
"fmt"
"math"

"github.com/grafana/loki/pkg/util/validation"
)

const (
errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded"
)

// RingCount is the interface exposed by a ring implementation which allows
// to count members
type RingCount interface {
HealthyInstancesCount() int
}

// Limiter implements primitives to get the maximum number of streams
// an ingester can handle for a specific tenant
type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int
}

// NewLimiter makes a new limiter
func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
}
}

// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
actualLimit := l.maxStreamsPerUser(userID)
if streams < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalStreamsPerUser(userID)
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) maxStreamsPerUser(userID string) int {
localLimit := l.limits.MaxLocalStreamsPerUser(userID)

// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))

// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
}

return localLimit
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}

// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
// (global limit / number of ingesters) * replication factor
numIngesters := l.ring.HealthyInstancesCount()

// May happen because the number of ingesters is asynchronously updated.
// If happens, we just temporarily ignore the global limit.
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}

return 0
}

func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}

return first
}
Loading

0 comments on commit ec40515

Please sign in to comment.