From 01481cca0cc16c4b745a59ab74116d5fc7025f5b Mon Sep 17 00:00:00 2001 From: Kaviraj Kanagaraj Date: Mon, 13 Jun 2022 15:44:49 +0200 Subject: [PATCH] Promtail: Support all `cri-o` tags (multiline tags). (#6177) * Promtail: Support `cri-o` tags. Hacked something together that works during flight to Grafanafest. * Fix newCri test cases Signed-off-by: Kaviraj * Add test for real "panic" stacktrace Signed-off-by: Kaviraj * Add bounded size for partial lines slice Signed-off-by: Kaviraj * Leave the promtail config alone Signed-off-by: Kaviraj * PR remarks Signed-off-by: Kaviraj * PR remarks: Avoid discarding lines when partial lines reaches upper bound. Instead merge into single full line and print warning. Signed-off-by: Kaviraj --- clients/pkg/logentry/stages/extensions.go | 69 ++++++++++++- .../pkg/logentry/stages/extensions_test.go | 98 ++++++++++++++++++- clients/pkg/logentry/stages/pipeline.go | 17 ++++ 3 files changed, 178 insertions(+), 6 deletions(-) diff --git a/clients/pkg/logentry/stages/extensions.go b/clients/pkg/logentry/stages/extensions.go index c4f1194695ab0..3957292237b04 100644 --- a/clients/pkg/logentry/stages/extensions.go +++ b/clients/pkg/logentry/stages/extensions.go @@ -1,11 +1,17 @@ package stages import ( + "strings" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) -const RFC3339Nano = "RFC3339Nano" +const ( + RFC3339Nano = "RFC3339Nano" + MaxPartialLinesSize = 100 // Max buffer size to hold partial lines. +) // NewDocker creates a Docker json log format specific pipeline stage. func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { @@ -35,9 +41,50 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro return NewPipeline(logger, stages, nil, registerer) } +type cri struct { + // bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`) + partialLines []string + maxPartialLines int + base *Pipeline +} + +// implement Stage interface +func (c *cri) Name() string { + return "cri" +} + +// implements Stage interface +func (c *cri) Run(entry chan Entry) chan Entry { + entry = c.base.Run(entry) + + in := RunWithSkip(entry, func(e Entry) (Entry, bool) { + if e.Extracted["flags"] == "P" { + if len(c.partialLines) >= c.maxPartialLines { + // Merge existing partialLines + newPartialLine := e.Line + e.Line = strings.Join(c.partialLines, "\n") + level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize) + c.partialLines = c.partialLines[:0] + c.partialLines = append(c.partialLines, newPartialLine) + return e, false + } + c.partialLines = append(c.partialLines, e.Line) + return e, true + } + if len(c.partialLines) > 0 { + c.partialLines = append(c.partialLines, e.Line) + e.Line = strings.Join(c.partialLines, "\n") + c.partialLines = c.partialLines[:0] + } + return e, false + }) + + return in +} + // NewCRI creates a CRI format specific pipeline stage func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { - stages := PipelineStages{ + base := PipelineStages{ PipelineStage{ StageTypeRegex: RegexConfig{ Expression: "^(?s)(?P