diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index def6e16f74211..3359a612050ef 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -90,6 +90,8 @@ type Config struct { Wrapper Wrapper `yaml:"-"` IndexShards int `yaml:"index_shards"` + + MaxDroppedStreams int `yaml:"max_dropped_streams"` } // RegisterFlags registers the flags. @@ -113,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.") f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`") f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.") + f.IntVar(&cfg.MaxDroppedStreams, "ingester.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing") } func (cfg *Config) Validate() error { @@ -811,7 +814,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ } instance := i.GetOrCreateInstance(instanceID) - tailer, err := newTailer(instanceID, req.Query, queryServer) + tailer, err := newTailer(instanceID, req.Query, queryServer, i.cfg.MaxDroppedStreams) if err != nil { return err } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 5918e443cc999..444f158d05f77 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -420,7 +420,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) - t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil) + t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10) require.NoError(b, err) for i := 0; i < 10000; i++ { require.NoError(b, inst.Push(ctx, &logproto.PushRequest{ diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index b773998c6e84b..19a949964649b 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -407,7 +407,7 @@ func Benchmark_PushStream(b *testing.B) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics) - t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}) + t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10) require.NoError(b, err) go t.loop() diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 57558a4eecacc..d2289ce447df1 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -39,14 +39,15 @@ type tailer struct { closeChan chan struct{} closeOnce sync.Once - blockedAt *time.Time - blockedMtx sync.RWMutex - droppedStreams []*logproto.DroppedStream + blockedAt *time.Time + blockedMtx sync.RWMutex + droppedStreams []*logproto.DroppedStream + maxDroppedStreams int conn TailServer } -func newTailer(orgID, query string, conn TailServer) (*tailer, error) { +func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*tailer, error) { expr, err := logql.ParseLogSelector(query, true) if err != nil { return nil, err @@ -58,15 +59,16 @@ func newTailer(orgID, query string, conn TailServer) (*tailer, error) { matchers := expr.Matchers() return &tailer{ - orgID: orgID, - matchers: matchers, - pipeline: pipeline, - sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), - conn: conn, - droppedStreams: []*logproto.DroppedStream{}, - id: generateUniqueID(orgID, query), - closeChan: make(chan struct{}), - expr: expr, + orgID: orgID, + matchers: matchers, + pipeline: pipeline, + sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), + conn: conn, + droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams), + maxDroppedStreams: maxDroppedStreams, + id: generateUniqueID(orgID, query), + closeChan: make(chan struct{}), + expr: expr, }, nil } @@ -219,6 +221,11 @@ func (t *tailer) dropStream(stream logproto.Stream) { t.blockedAt = &blockedAt } + if len(t.droppedStreams) >= t.maxDroppedStreams { + level.Info(util_log.Logger).Log("msg", "tailer dropped streams is reset", "length", len(t.droppedStreams)) + t.droppedStreams = nil + } + t.droppedStreams = append(t.droppedStreams, &logproto.DroppedStream{ From: stream.Entries[0].Timestamp, To: stream.Entries[len(stream.Entries)-1].Timestamp, diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index f1d47d8c4d5d1..9b06e4560018a 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -26,7 +26,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { } for run := 0; run < runs; run++ { - tailer, err := newTailer("org-id", stream.Labels, nil) + tailer, err := newTailer("org-id", stream.Labels, nil, 10) require.NoError(t, err) require.NotNil(t, tailer) @@ -49,13 +49,57 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { } } +func Test_dropstream(t *testing.T) { + maxDroppedStreams := 10 + + entry := logproto.Entry{Timestamp: time.Now(), Line: "foo"} + + cases := []struct { + name string + drop int + expected int + }{ + { + name: "less than maxDroppedStreams", + drop: maxDroppedStreams - 2, + expected: maxDroppedStreams - 2, + }, + { + name: "equal to maxDroppedStreams", + drop: maxDroppedStreams, + expected: maxDroppedStreams, + }, + { + name: "greater than maxDroppedStreams", + drop: maxDroppedStreams + 2, + expected: 2, // should be bounded to maxDroppedStreams + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, maxDroppedStreams) + require.NoError(t, err) + + for i := 0; i < c.drop; i++ { + tail.dropStream(logproto.Stream{ + Entries: []logproto.Entry{ + entry, + }, + }) + } + assert.Equal(t, c.expected, len(tail.droppedStreams)) + }) + } +} + type fakeTailServer struct{} func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil } func (f *fakeTailServer) Context() context.Context { return context.Background() } func Test_TailerSendRace(t *testing.T) { - tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}) + tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10) require.NoError(t, err) var wg sync.WaitGroup