Skip to content

Commit

Permalink
Promtail: Support all cri-o tags (multiline tags). (#6177)
Browse files Browse the repository at this point in the history
* Promtail: Support `cri-o` tags.

Hacked something together that works during flight to Grafanafest.

* Fix newCri test cases

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Add test for real "panic" stacktrace

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Add bounded size for partial lines slice

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Leave the promtail config alone

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* PR remarks

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* PR remarks: Avoid discarding lines when partial lines reaches upper bound.

Instead merge into single full line and print warning.

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk authored Jun 13, 2022
1 parent a649e0d commit 01481cc
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 6 deletions.
69 changes: 66 additions & 3 deletions clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package stages

import (
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

const RFC3339Nano = "RFC3339Nano"
const (
RFC3339Nano = "RFC3339Nano"
MaxPartialLinesSize = 100 // Max buffer size to hold partial lines.
)

// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
Expand Down Expand Up @@ -35,9 +41,50 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
return NewPipeline(logger, stages, nil, registerer)
}

type cri struct {
// bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`)
partialLines []string
maxPartialLines int
base *Pipeline
}

// implement Stage interface
func (c *cri) Name() string {
return "cri"
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)

in := RunWithSkip(entry, func(e Entry) (Entry, bool) {
if e.Extracted["flags"] == "P" {
if len(c.partialLines) >= c.maxPartialLines {
// Merge existing partialLines
newPartialLine := e.Line
e.Line = strings.Join(c.partialLines, "\n")
level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize)
c.partialLines = c.partialLines[:0]
c.partialLines = append(c.partialLines, newPartialLine)
return e, false
}
c.partialLines = append(c.partialLines, e.Line)
return e, true
}
if len(c.partialLines) > 0 {
c.partialLines = append(c.partialLines, e.Line)
e.Line = strings.Join(c.partialLines, "\n")
c.partialLines = c.partialLines[:0]
}
return e, false
})

return in
}

// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
stages := PipelineStages{
base := PipelineStages{
PipelineStage{
StageTypeRegex: RegexConfig{
Expression: "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
Expand All @@ -59,6 +106,22 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
"content",
},
},
PipelineStage{
StageTypeOutput: OutputConfig{
"tags",
},
},
}
return NewPipeline(logger, stages, nil, registerer)

p, err := NewPipeline(logger, base, nil, registerer)
if err != nil {
return nil, err
}

c := cri{
maxPartialLines: MaxPartialLinesSize,
base: p,
}
c.partialLines = make([]string, 0, c.maxPartialLines)
return &c, nil
}
98 changes: 95 additions & 3 deletions clients/pkg/logentry/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -87,6 +88,97 @@ var (
criTestTime2 = time.Now()
)

func TestCRI_tags(t *testing.T) {
cases := []struct {
name string
lines []string
expected []string
maxPartialLines int
err error
}{
{
name: "tag F",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout F some full line",
"2019-05-07T18:57:55.904275087+00:00 stdout F log",
},
expected: []string{"some full line", "log"},
},
{
name: "tag P",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2",
"2019-05-07T18:57:55.904275087+00:00 stdout F log finished",
"2019-05-07T18:57:55.904275087+00:00 stdout F another full log",
},
expected: []string{
"partial line 1\npartial line 2\nlog finished",
"another full log",
},
},
{
name: "tag P exceeding MaxPartialLinesSize lines",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4", // this exceeds the `MaxPartialLinesSize` of 3
"2019-05-07T18:57:55.904275087+00:00 stdout F log finished",
"2019-05-07T18:57:55.904275087+00:00 stdout F another full log",
},
maxPartialLines: 3,
expected: []string{
"partial line 1\npartial line 2\npartial line 3",
"partial line 4\nlog finished",
"another full log",
},
},
{
name: "panic",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout P panic: I'm pannicing",
"2019-05-07T18:57:50.904275087+00:00 stdout P ",
"2019-05-07T18:57:50.904275087+00:00 stdout P goroutine 1 [running]:",
"2019-05-07T18:57:55.904275087+00:00 stdout P main.main()",
"2019-05-07T18:57:55.904275087+00:00 stdout F /home/kavirajk/src/go-play/main.go:11 +0x27",
},
expected: []string{
`panic: I'm pannicing
goroutine 1 [running]:
main.main()
/home/kavirajk/src/go-play/main.go:11 +0x27`,
},
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer)
require.NoError(t, err)

got := make([]string, 0)

// tweak `maxPartialLines`
if tt.maxPartialLines != 0 {
p.(*cri).maxPartialLines = tt.maxPartialLines
}

for _, line := range tt.lines {
out := processEntries(p, newEntry(nil, nil, line, time.Now()))
if len(out) > 0 {
for _, en := range out {
got = append(got, en.Line)

}
}
}
assert.Equal(t, tt.expected, got)
})
}
}

func TestNewCri(t *testing.T) {
tests := map[string]struct {
entry string
Expand All @@ -97,7 +189,7 @@ func TestNewCri(t *testing.T) {
expectedLabels map[string]string
}{
"happy path": {
criTestTimeStr + " stderr P message",
criTestTimeStr + " stderr F message",
"message",
time.Now(),
criTestTime,
Expand All @@ -107,7 +199,7 @@ func TestNewCri(t *testing.T) {
},
},
"multi line pass": {
criTestTimeStr + " stderr P message\nmessage2",
criTestTimeStr + " stderr F message\nmessage2",
"message\nmessage2",
time.Now(),
criTestTime,
Expand All @@ -117,7 +209,7 @@ func TestNewCri(t *testing.T) {
},
},
"invalid timestamp": {
"3242 stderr P message",
"3242 stderr F message",
"message",
criTestTime2,
criTestTime2,
Expand Down
17 changes: 17 additions & 0 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry {
return out
}

// RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.
func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range input {
ee, skip := process(e)
if skip {
continue
}
out <- ee
}
}()

return out
}

// Run implements Stage
func (p *Pipeline) Run(in chan Entry) chan Entry {
in = RunWith(in, func(e Entry) Entry {
Expand Down

0 comments on commit 01481cc

Please sign in to comment.