From 882bfb7cc1c0a89bf17854d8e8be501254837b99 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 1 May 2022 12:31:02 -0400 Subject: [PATCH 1/5] don't create more streams when live tailing logs, process the log line with the pipeline to allow mutating the content of the line but don't split it into new streams. Signed-off-by: Edward Welch --- pkg/ingester/tailer.go | 42 ++++++++++-------------------------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 4d034d7acb37b..0d4496a745330 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -121,53 +121,31 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { return } - streams := t.processStream(stream, lbs) - if len(streams) == 0 { - return - } - for _, s := range streams { - select { - case t.sendChan <- s: - default: - t.dropStream(*s) - } + t.processStream(&stream, lbs) + select { + case t.sendChan <- &stream: + default: + t.dropStream(stream) } } -func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { +func (t *tailer) processStream(stream *logproto.Stream, lbs labels.Labels) { // Optimization: skip filtering entirely, if no filter is set if log.IsNoopPipeline(t.pipeline) { - return []*logproto.Stream{&stream} + return } // pipeline are not thread safe and tailer can process multiple stream at once. t.pipelineMtx.Lock() defer t.pipelineMtx.Unlock() - streams := map[uint64]*logproto.Stream{} - sp := t.pipeline.ForStream(lbs) - for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) + for i, e := range stream.Entries { + newLine, _, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { continue } - var stream *logproto.Stream - if stream, ok = streams[parsedLbs.Hash()]; !ok { - stream = &logproto.Stream{ - Labels: parsedLbs.String(), - } - streams[parsedLbs.Hash()] = stream - } - stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: e.Timestamp, - Line: newLine, - }) - } - streamsResult := make([]*logproto.Stream, 0, len(streams)) - for _, stream := range streams { - streamsResult = append(streamsResult, stream) + stream.Entries[i].Line = newLine } - return streamsResult } // isMatching returns true if lbs matches all matchers. From 3efbb55168b85702be744d36928308f58156e9a1 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 1 May 2022 14:37:00 -0400 Subject: [PATCH 2/5] update changelog Signed-off-by: Edward Welch --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0bb8f1282e01..a4cf7b6070a89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ ##### Fixes * [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values ##### Changes +* [6063](https://github.com/grafana/loki/pull/6063) **slim-bean**: Changes tailing API responses to not split a stream when using parsers in the query * [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time. * [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable * [5650](https://github.com/grafana/loki/pull/5650) **cyriltovena**: Remove more chunkstore and schema version below v9 From fbd57a71c7da7ea6c20d30107d46eb76b6f8f214 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 2 May 2022 08:16:58 -0400 Subject: [PATCH 3/5] don't mutate the incoming streams Signed-off-by: Edward Welch --- pkg/ingester/tailer.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 0d4496a745330..4e27d23f6dd6a 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -121,31 +121,40 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { return } - t.processStream(&stream, lbs) + processed := t.processStream(stream, lbs) select { - case t.sendChan <- &stream: + case t.sendChan <- processed: default: t.dropStream(stream) } } -func (t *tailer) processStream(stream *logproto.Stream, lbs labels.Labels) { +func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) *logproto.Stream { // Optimization: skip filtering entirely, if no filter is set if log.IsNoopPipeline(t.pipeline) { - return + return &stream } // pipeline are not thread safe and tailer can process multiple stream at once. t.pipelineMtx.Lock() defer t.pipelineMtx.Unlock() + responseStream := &logproto.Stream{ + Labels: stream.Labels, + Entries: make([]logproto.Entry, 0, len(stream.Entries)), + } sp := t.pipeline.ForStream(lbs) - for i, e := range stream.Entries { + for _, e := range stream.Entries { newLine, _, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { + responseStream.Entries = append(responseStream.Entries, e) continue } - stream.Entries[i].Line = newLine + responseStream.Entries = append(responseStream.Entries, logproto.Entry{ + Timestamp: e.Timestamp, + Line: newLine, + }) } + return responseStream } // isMatching returns true if lbs matches all matchers. From 1b15c108634d510479f73e199d6a31154c659142 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 2 May 2022 12:55:36 -0400 Subject: [PATCH 4/5] do not reuse the pipeline because it caches labels Signed-off-by: Edward Welch --- pkg/ingester/tailer.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 4e27d23f6dd6a..4a16f155de47f 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -28,8 +28,7 @@ type tailer struct { id uint32 orgID string matchers []*labels.Matcher - pipeline syntax.Pipeline - expr syntax.Expr + expr syntax.LogSelectorExpr pipelineMtx sync.Mutex sendChan chan *logproto.Stream @@ -52,7 +51,9 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta if err != nil { return nil, err } - pipeline, err := expr.Pipeline() + // Make sure we can build a pipeline. The stream processing code doesn't have a place to handle + // this error so make sure we handle it here. + _, err = expr.Pipeline() if err != nil { return nil, err } @@ -61,7 +62,6 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta return &tailer{ orgID: orgID, matchers: matchers, - pipeline: pipeline, sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), conn: conn, droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams), @@ -130,8 +130,13 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { } func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) *logproto.Stream { + // Build a new pipeline for each call because the pipeline builds a cache of labels + // and if we don't start with a new pipeline that cache will grow unbounded. + // The error is ignored because it would be handled in the constructor of the tailer. + pipeline, _ := t.expr.Pipeline() + // Optimization: skip filtering entirely, if no filter is set - if log.IsNoopPipeline(t.pipeline) { + if log.IsNoopPipeline(pipeline) { return &stream } // pipeline are not thread safe and tailer can process multiple stream at once. @@ -142,11 +147,10 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) *logpr Labels: stream.Labels, Entries: make([]logproto.Entry, 0, len(stream.Entries)), } - sp := t.pipeline.ForStream(lbs) + sp := pipeline.ForStream(lbs) for _, e := range stream.Entries { newLine, _, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { - responseStream.Entries = append(responseStream.Entries, e) continue } responseStream.Entries = append(responseStream.Entries, logproto.Entry{ From 996f8cb655863f30774859f5ae8f333383f6735a Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 2 May 2022 13:11:11 -0400 Subject: [PATCH 5/5] add note to upgrade guide Signed-off-by: Edward Welch --- docs/sources/upgrading/_index.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 4b012c2b7bb74..5dc4f8dfabd8e 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -31,6 +31,30 @@ The output is incredibly verbose as it shows the entire internal config struct u ## Main / Unreleased +### Loki + +#### Tail API no longer creates multiple streams when using parsers. + +We expect this change to be non-impactful however it is a breaking change to existing behavior. + +This change would likely only affect anyone who's doing machine to machine type work with Loki's tail API +and is expecting a parser in a query to alter the streams in a tail response. + +Prior to this change a tail request with a parser (e.g. json, logfmt, regexp, pattern) would split the +incoming log stream into multiple streams based on the extracted labels after running the parser. + +[PR 6063](https://github.com/grafana/loki/pull/6063) changes this behavior +to no longer split incoming streams when using a parser in the query, instead Loki will return exactly +the same streams with and without a parser in the query. + +We found a significant performance impact when using parsers on live tailing queries which would +result in turning a single stream with multiple entries into multiple streams with single entries. +Often leading to the case where the tailing client could not keep up with the number of streams +being pushed and tailing logs being dropped. + +This change will have no impact on viewing the tail output from Grafana or logcli. +Parsers can still be used to do filtering and reformatting of live tailed log lines. + ## 2.5.0 ### Loki