diff --git a/CHANGELOG.md b/CHANGELOG.md index b0bb8f1282e01..a4cf7b6070a89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ ##### Fixes * [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values ##### Changes +* [6063](https://github.com/grafana/loki/pull/6063) **slim-bean**: Changes tailing API responses to not split a stream when using parsers in the query * [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time. * [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable * [5650](https://github.com/grafana/loki/pull/5650) **cyriltovena**: Remove more chunkstore and schema version below v9 diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 4b012c2b7bb74..5dc4f8dfabd8e 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -31,6 +31,30 @@ The output is incredibly verbose as it shows the entire internal config struct u ## Main / Unreleased +### Loki + +#### Tail API no longer creates multiple streams when using parsers. + +We expect this change to be non-impactful however it is a breaking change to existing behavior. + +This change would likely only affect anyone who's doing machine to machine type work with Loki's tail API +and is expecting a parser in a query to alter the streams in a tail response. + +Prior to this change a tail request with a parser (e.g. json, logfmt, regexp, pattern) would split the +incoming log stream into multiple streams based on the extracted labels after running the parser. + +[PR 6063](https://github.com/grafana/loki/pull/6063) changes this behavior +to no longer split incoming streams when using a parser in the query, instead Loki will return exactly +the same streams with and without a parser in the query. + +We found a significant performance impact when using parsers on live tailing queries which would +result in turning a single stream with multiple entries into multiple streams with single entries. +Often leading to the case where the tailing client could not keep up with the number of streams +being pushed and tailing logs being dropped. + +This change will have no impact on viewing the tail output from Grafana or logcli. +Parsers can still be used to do filtering and reformatting of live tailed log lines. + ## 2.5.0 ### Loki diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 4d034d7acb37b..4a16f155de47f 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -28,8 +28,7 @@ type tailer struct { id uint32 orgID string matchers []*labels.Matcher - pipeline syntax.Pipeline - expr syntax.Expr + expr syntax.LogSelectorExpr pipelineMtx sync.Mutex sendChan chan *logproto.Stream @@ -52,7 +51,9 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta if err != nil { return nil, err } - pipeline, err := expr.Pipeline() + // Make sure we can build a pipeline. The stream processing code doesn't have a place to handle + // this error so make sure we handle it here. + _, err = expr.Pipeline() if err != nil { return nil, err } @@ -61,7 +62,6 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta return &tailer{ orgID: orgID, matchers: matchers, - pipeline: pipeline, sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), conn: conn, droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams), @@ -121,53 +121,44 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { return } - streams := t.processStream(stream, lbs) - if len(streams) == 0 { - return - } - for _, s := range streams { - select { - case t.sendChan <- s: - default: - t.dropStream(*s) - } + processed := t.processStream(stream, lbs) + select { + case t.sendChan <- processed: + default: + t.dropStream(stream) } } -func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { +func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) *logproto.Stream { + // Build a new pipeline for each call because the pipeline builds a cache of labels + // and if we don't start with a new pipeline that cache will grow unbounded. + // The error is ignored because it would be handled in the constructor of the tailer. + pipeline, _ := t.expr.Pipeline() + // Optimization: skip filtering entirely, if no filter is set - if log.IsNoopPipeline(t.pipeline) { - return []*logproto.Stream{&stream} + if log.IsNoopPipeline(pipeline) { + return &stream } // pipeline are not thread safe and tailer can process multiple stream at once. t.pipelineMtx.Lock() defer t.pipelineMtx.Unlock() - streams := map[uint64]*logproto.Stream{} - - sp := t.pipeline.ForStream(lbs) + responseStream := &logproto.Stream{ + Labels: stream.Labels, + Entries: make([]logproto.Entry, 0, len(stream.Entries)), + } + sp := pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) + newLine, _, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { continue } - var stream *logproto.Stream - if stream, ok = streams[parsedLbs.Hash()]; !ok { - stream = &logproto.Stream{ - Labels: parsedLbs.String(), - } - streams[parsedLbs.Hash()] = stream - } - stream.Entries = append(stream.Entries, logproto.Entry{ + responseStream.Entries = append(responseStream.Entries, logproto.Entry{ Timestamp: e.Timestamp, Line: newLine, }) } - streamsResult := make([]*logproto.Stream, 0, len(streams)) - for _, stream := range streams { - streamsResult = append(streamsResult, stream) - } - return streamsResult + return responseStream } // isMatching returns true if lbs matches all matchers.