Skip to content

Commit

Permalink
Promtail: Add metrics for journal target (#6105)
Browse files Browse the repository at this point in the history
* Add metrics for journal target

There are no metrics for the journal target yet, this adds two basic
ones:
- Total number of lines processed
- Total number of errors processing lines (with a reason)

Because of the way the journal works, tailing one file with many
possible process writing to it, I chose note to expose counts per unit.

* Add changelog entry

* Fix compilation of (unsupported) journalmanager target on windows

* Extract possible errors to constants & add tests
  • Loading branch information
RutgerKe authored Jun 11, 2022
1 parent 9218e46 commit 1794a76
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
Expand Down
19 changes: 19 additions & 0 deletions clients/cmd/promtail/promtail-journal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: journal
journal:
max_age: 12h
labels:
job: systemd-journal
relabel_configs:
- source_labels: ['__journal__systemd_unit']
target_label: 'unit'
13 changes: 11 additions & 2 deletions clients/pkg/promtail/targets/journal/journaltarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var defaultJournalEntryFunc = func(c sdjournal.JournalReaderConfig, cursor strin
// JournalTarget tails systemd journal entries.
// nolint
type JournalTarget struct {
metrics *Metrics
logger log.Logger
handler api.EntryHandler
positions positions.Positions
Expand All @@ -104,6 +105,7 @@ type JournalTarget struct {

// NewJournalTarget configures a new JournalTarget.
func NewJournalTarget(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
positions positions.Positions,
Expand All @@ -113,6 +115,7 @@ func NewJournalTarget(
) (*JournalTarget, error) {

return journalTargetWithReader(
metrics,
logger,
handler,
positions,
Expand All @@ -125,6 +128,7 @@ func NewJournalTarget(
}

func journalTargetWithReader(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
pos positions.Positions,
Expand All @@ -147,6 +151,7 @@ func journalTargetWithReader(

until := make(chan time.Time)
t := &JournalTarget{
metrics: metrics,
logger: logger,
handler: handler,
positions: pos,
Expand Down Expand Up @@ -263,15 +268,16 @@ func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error)

bb, err := json.Marshal(entry.Fields)
if err != nil {
level.Error(t.logger).Log("msg", "could not marshal journal fields to JSON", "err", err)
level.Error(t.logger).Log("msg", "could not marshal journal fields to JSON", "err", err, "unit", entry.Fields["_SYSTEMD_UNIT"])
return journalEmptyStr, nil
}
msg = string(bb)
} else {
var ok bool
msg, ok = entry.Fields["MESSAGE"]
if !ok {
level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field")
level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field", "unit", entry.Fields["_SYSTEMD_UNIT"])
t.metrics.journalErrors.WithLabelValues(noMessageError).Inc()
return journalEmptyStr, nil
}
}
Expand All @@ -296,9 +302,12 @@ func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error)
}
if len(labels) == 0 {
// No labels, drop journal entry
level.Debug(t.logger).Log("msg", "received journal entry with no labels", "unit", entry.Fields["_SYSTEMD_UNIT"])
t.metrics.journalErrors.WithLabelValues(emptyLabelsError).Inc()
return journalEmptyStr, nil
}

t.metrics.journalLines.Inc()
t.positions.PutString(t.positionPath, entry.Cursor)
t.handler.Chan() <- api.Entry{
Labels: labels,
Expand Down
101 changes: 92 additions & 9 deletions clients/pkg/promtail/targets/journal/journaltarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package journal
import (
"io"
"os"
"strings"
"testing"
"time"

"github.com/coreos/go-systemd/sdjournal"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,12 +49,11 @@ func newMockJournalEntry(entry *sdjournal.JournalEntry) journalEntryFunc {
}
}

func (r *mockJournalReader) Write(msg string, fields map[string]string) {
func (r *mockJournalReader) Write(fields map[string]string) {
allFields := make(map[string]string, len(fields))
for k, v := range fields {
allFields[k] = v
}
allFields["MESSAGE"] = msg

ts := uint64(time.Now().UnixNano())

Expand Down Expand Up @@ -95,24 +97,104 @@ func TestJournalTarget(t *testing.T) {
err = yaml.Unmarshal([]byte(relabelCfg), &relabels)
require.NoError(t, err)

jt, err := journalTargetWithReader(logger, client, ps, "test", relabels,
registry := prometheus.NewRegistry()
jt, err := journalTargetWithReader(NewMetrics(registry), logger, client, ps, "test", relabels,
&scrapeconfig.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
r.t = t

for i := 0; i < 10; i++ {
r.Write("ping", map[string]string{
r.Write(map[string]string{
"MESSAGE": "ping",
"CODE_FILE": "journaltarget_test.go",
})
assert.NoError(t, err)
}
require.NoError(t, jt.Stop())
client.Stop()

expectedMetrics := `# HELP promtail_journal_target_lines_total Total number of successful journal lines read
# TYPE promtail_journal_target_lines_total counter
promtail_journal_target_lines_total 10
`

if err := testutil.GatherAndCompare(registry,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
assert.Len(t, client.Received(), 10)
}

func TestJournalTargetParsingErrors(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

testutils.InitRandom()
dirName := "/tmp/" + testutils.RandName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}

client := fake.New(func() {})

// We specify no relabel rules, so that we end up with an empty labelset
var relabels []*relabel.Config

registry := prometheus.NewRegistry()
jt, err := journalTargetWithReader(NewMetrics(registry), logger, client, ps, "test", relabels,
&scrapeconfig.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
r.t = t

// No labels but correct message
for i := 0; i < 10; i++ {
r.Write(map[string]string{
"MESSAGE": "ping",
"CODE_FILE": "journaltarget_test.go",
})
assert.NoError(t, err)
}

// No labels and no message
for i := 0; i < 10; i++ {
r.Write(map[string]string{
"CODE_FILE": "journaltarget_test.go",
})
assert.NoError(t, err)
}
require.NoError(t, jt.Stop())
client.Stop()

expectedMetrics := `# HELP promtail_journal_target_lines_total Total number of successful journal lines read
# TYPE promtail_journal_target_lines_total counter
promtail_journal_target_lines_total 0
# HELP promtail_journal_target_parsing_errors_total Total number of parsing errors while reading journal messages
# TYPE promtail_journal_target_parsing_errors_total counter
promtail_journal_target_parsing_errors_total{error="empty_labels"} 10
promtail_journal_target_parsing_errors_total{error="no_message"} 10
`

if err := testutil.GatherAndCompare(registry,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

assert.Len(t, client.Received(), 0)
}

func TestJournalTarget_JSON(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down Expand Up @@ -147,15 +229,16 @@ func TestJournalTarget_JSON(t *testing.T) {

cfg := &scrapeconfig.JournalTargetConfig{JSON: true}

jt, err := journalTargetWithReader(logger, client, ps, "test", relabels,
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", relabels,
cfg, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
r.t = t

for i := 0; i < 10; i++ {
r.Write("ping", map[string]string{
r.Write(map[string]string{
"MESSAGE": "ping",
"CODE_FILE": "journaltarget_test.go",
"OTHER_FIELD": "foobar",
})
Expand Down Expand Up @@ -197,7 +280,7 @@ func TestJournalTarget_Since(t *testing.T) {
MaxAge: "4h",
}

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
&cfg, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

Expand Down Expand Up @@ -237,7 +320,7 @@ func TestJournalTarget_Cursor_TooOld(t *testing.T) {
RealtimeTimestamp: uint64(entryTs.UnixNano()),
})

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
&cfg, newMockJournalReader, journalEntry)
require.NoError(t, err)

Expand Down Expand Up @@ -277,7 +360,7 @@ func TestJournalTarget_Cursor_NotTooOld(t *testing.T) {
RealtimeTimestamp: uint64(entryTs.UnixNano() / int64(time.Microsecond)),
})

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
&cfg, newMockJournalReader, journalEntry)
require.NoError(t, err)

Expand Down
3 changes: 1 addition & 2 deletions clients/pkg/promtail/targets/journal/journaltargetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package journal
import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/positions"
Expand All @@ -21,7 +20,7 @@ type JournalTargetManager struct{}
// NewJournalTargetManager returns nil as JournalTargets are not supported
// on this platform.
func NewJournalTargetManager(
reg prometheus.Registerer,
metrics *Metrics,
logger log.Logger,
positions positions.Positions,
client api.EntryHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package journal
import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/api"
Expand All @@ -20,11 +19,13 @@ import (
type JournalTargetManager struct {
logger log.Logger
targets map[string]*JournalTarget

metrics *Metrics
}

// NewJournalTargetManager creates a new JournalTargetManager.
func NewJournalTargetManager(
reg prometheus.Registerer,
metrics *Metrics,
logger log.Logger,
positions positions.Positions,
client api.EntryHandler,
Expand All @@ -33,19 +34,21 @@ func NewJournalTargetManager(
tm := &JournalTargetManager{
logger: logger,
targets: make(map[string]*JournalTarget),
metrics: metrics,
}

for _, cfg := range scrapeConfigs {
if cfg.JournalConfig == nil {
continue
}

pipeline, err := stages.NewPipeline(log.With(logger, "component", "journal_pipeline"), cfg.PipelineStages, &cfg.JobName, reg)
pipeline, err := stages.NewPipeline(log.With(logger, "component", "journal_pipeline"), cfg.PipelineStages, &cfg.JobName, metrics.reg)
if err != nil {
return nil, err
}

t, err := NewJournalTarget(
tm.metrics,
logger,
pipeline.Wrap(client),
positions,
Expand Down
43 changes: 43 additions & 0 deletions clients/pkg/promtail/targets/journal/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package journal

import "github.com/prometheus/client_golang/prometheus"

// Metrics holds a set of journal target metrics.
type Metrics struct {
reg prometheus.Registerer

journalErrors *prometheus.CounterVec
journalLines prometheus.Counter
}

const (
noMessageError = "no_message"
emptyLabelsError = "empty_labels"
)

// NewMetrics creates a new set of journal target metrics. If reg is non-nil, the
// metrics will be registered.
func NewMetrics(reg prometheus.Registerer) *Metrics {
var m Metrics
m.reg = reg

m.journalErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "journal_target_parsing_errors_total",
Help: "Total number of parsing errors while reading journal messages",
}, []string{"error"})
m.journalLines = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "journal_target_lines_total",
Help: "Total number of successful journal lines read",
})

if reg != nil {
reg.MustRegister(
m.journalErrors,
m.journalLines,
)
}

return &m
}
Loading

0 comments on commit 1794a76

Please sign in to comment.