Skip to content

Commit

Permalink
Automatically prune metrics from the /metrics output of the promtail …
Browse files Browse the repository at this point in the history
…metrics pipeline stage after an idle period.

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean committed Feb 12, 2020
1 parent 4aeeeea commit ebab59c
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 27 deletions.
7 changes: 7 additions & 0 deletions docs/clients/promtail/stages/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ type: Gauge
# defaulting to the metric's name if not present.
[source: <string>]

# Label values on metrics are dynamic which can cause exported metrics
# to go stale (for example when a stream stops receiving logs).
# To prevent unbounded growth of the /metrics endpoint any metrics which
# have not been updated within this time will be removed.
# Must be greater than or equal to '1s', if undefined default is '5m'
[max_idle_duration: <string>]

config:
# Filters down source data and only changes the metric
# if the targeted value exactly matches the provided string.
Expand Down
35 changes: 31 additions & 4 deletions pkg/logentry/metric/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metric

import (
"strings"
"time"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand Down Expand Up @@ -54,7 +55,7 @@ type Counters struct {
}

// NewCounters creates a new counter vec.
func NewCounters(name, help string, config interface{}) (*Counters, error) {
func NewCounters(name, help string, config interface{}, maxIdleSec int64) (*Counters, error) {
cfg, err := parseCounterConfig(config)
if err != nil {
return nil, err
Expand All @@ -65,12 +66,14 @@ func NewCounters(name, help string, config interface{}) (*Counters, error) {
}
return &Counters{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewCounter(prometheus.CounterOpts{
return &expiringCounter{prometheus.NewCounter(prometheus.CounterOpts{
Help: help,
Name: name,
ConstLabels: labels,
})
}),
}),
0,
}
}, maxIdleSec),
Cfg: cfg,
}, nil
}
Expand All @@ -79,3 +82,27 @@ func NewCounters(name, help string, config interface{}) (*Counters, error) {
func (c *Counters) With(labels model.LabelSet) prometheus.Counter {
return c.metricVec.With(labels).(prometheus.Counter)
}

type expiringCounter struct {
prometheus.Counter
lastModSec int64
}

// Inc increments the counter by 1. Use Add to increment it by arbitrary
// non-negative values.
func (e *expiringCounter) Inc() {
e.Counter.Inc()
e.lastModSec = time.Now().Unix()
}

// Add adds the given value to the counter. It panics if the value is <
// 0.
func (e *expiringCounter) Add(val float64) {
e.Counter.Add(val)
e.lastModSec = time.Now().Unix()
}

// HasExpired implements Expireable
func (e *expiringCounter) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-e.lastModSec >= maxAgeSec
}
54 changes: 54 additions & 0 deletions pkg/logentry/metric/counters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package metric

import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

var (
Expand All @@ -13,6 +17,7 @@ var (
)

func Test_validateCounterConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
config CounterConfig
Expand Down Expand Up @@ -60,3 +65,52 @@ func Test_validateCounterConfig(t *testing.T) {
})
}
}

func TestCounterExpiration(t *testing.T) {
t.Parallel()
cfg := CounterConfig{
Action: "inc",
}

cnt, err := NewCounters("test1", "HELP ME!!!!!", cfg, 1)
assert.Nil(t, err)

// Create a label and increment the counter
lbl1 := model.LabelSet{}
lbl1["test"] = "i don't wanna make this a constant"
cnt.With(lbl1).Inc()

// Collect the metrics, should still find the metric in the map
collect(cnt)
assert.Contains(t, cnt.metrics, lbl1.Fingerprint())

time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec

//Add another counter with new label val
lbl2 := model.LabelSet{}
lbl2["test"] = "eat this linter"
cnt.With(lbl2).Inc()

// Collect the metrics, first counter should have expired and removed, second should still be present
collect(cnt)
assert.NotContains(t, cnt.metrics, lbl1.Fingerprint())
assert.Contains(t, cnt.metrics, lbl2.Fingerprint())
}

func collect(c prometheus.Collector) {
done := make(chan struct{})
collector := make(chan prometheus.Metric)

go func() {
defer close(done)
c.Collect(collector)
}()

for {
select {
case <-collector:
case <-done:
return
}
}
}
61 changes: 57 additions & 4 deletions pkg/logentry/metric/gauges.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metric

import (
"strings"
"time"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand Down Expand Up @@ -56,7 +57,7 @@ type Gauges struct {
}

// NewGauges creates a new gauge vec.
func NewGauges(name, help string, config interface{}) (*Gauges, error) {
func NewGauges(name, help string, config interface{}, maxIdleSec int64) (*Gauges, error) {
cfg, err := parseGaugeConfig(config)
if err != nil {
return nil, err
Expand All @@ -67,12 +68,14 @@ func NewGauges(name, help string, config interface{}) (*Gauges, error) {
}
return &Gauges{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewGauge(prometheus.GaugeOpts{
return &expiringGauge{prometheus.NewGauge(prometheus.GaugeOpts{
Help: help,
Name: name,
ConstLabels: labels,
})
}),
}),
0,
}
}, maxIdleSec),
Cfg: cfg,
}, nil
}
Expand All @@ -81,3 +84,53 @@ func NewGauges(name, help string, config interface{}) (*Gauges, error) {
func (g *Gauges) With(labels model.LabelSet) prometheus.Gauge {
return g.metricVec.With(labels).(prometheus.Gauge)
}

type expiringGauge struct {
prometheus.Gauge
lastModSec int64
}

// Set sets the Gauge to an arbitrary value.
func (g *expiringGauge) Set(val float64) {
g.Gauge.Set(val)
g.lastModSec = time.Now().Unix()
}

// Inc increments the Gauge by 1. Use Add to increment it by arbitrary
// values.
func (g *expiringGauge) Inc() {
g.Gauge.Inc()
g.lastModSec = time.Now().Unix()
}

// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary
// values.
func (g *expiringGauge) Dec() {
g.Gauge.Dec()
g.lastModSec = time.Now().Unix()
}

// Add adds the given value to the Gauge. (The value can be negative,
// resulting in a decrease of the Gauge.)
func (g *expiringGauge) Add(val float64) {
g.Gauge.Add(val)
g.lastModSec = time.Now().Unix()
}

// Sub subtracts the given value from the Gauge. (The value can be
// negative, resulting in an increase of the Gauge.)
func (g *expiringGauge) Sub(val float64) {
g.Gauge.Sub(val)
g.lastModSec = time.Now().Unix()
}

// SetToCurrentTime sets the Gauge to the current Unix time in seconds.
func (g *expiringGauge) SetToCurrentTime() {
g.Gauge.SetToCurrentTime()
g.lastModSec = time.Now().Unix()
}

// HasExpired implements Expireable
func (g *expiringGauge) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-g.lastModSec >= maxAgeSec
}
40 changes: 40 additions & 0 deletions pkg/logentry/metric/gauges_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package metric

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

func TestGaugeExpiration(t *testing.T) {
t.Parallel()
cfg := GaugeConfig{
Action: "inc",
}

gag, err := NewGauges("test1", "HELP ME!!!!!", cfg, 1)
assert.Nil(t, err)

// Create a label and increment the gauge
lbl1 := model.LabelSet{}
lbl1["test"] = "app"
gag.With(lbl1).Inc()

// Collect the metrics, should still find the metric in the map
collect(gag)
assert.Contains(t, gag.metrics, lbl1.Fingerprint())

time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec

//Add another gauge with new label val
lbl2 := model.LabelSet{}
lbl2["test"] = "app2"
gag.With(lbl2).Inc()

// Collect the metrics, first gauge should have expired and removed, second should still be present
collect(gag)
assert.NotContains(t, gag.metrics, lbl1.Fingerprint())
assert.Contains(t, gag.metrics, lbl2.Fingerprint())
}
28 changes: 24 additions & 4 deletions pkg/logentry/metric/histograms.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metric

import (
"time"

"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -31,7 +33,7 @@ type Histograms struct {
}

// NewHistograms creates a new histogram vec.
func NewHistograms(name, help string, config interface{}) (*Histograms, error) {
func NewHistograms(name, help string, config interface{}, maxIdleSec int64) (*Histograms, error) {
cfg, err := parseHistogramConfig(config)
if err != nil {
return nil, err
Expand All @@ -42,13 +44,15 @@ func NewHistograms(name, help string, config interface{}) (*Histograms, error) {
}
return &Histograms{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewHistogram(prometheus.HistogramOpts{
return &expiringHistogram{prometheus.NewHistogram(prometheus.HistogramOpts{
Help: help,
Name: name,
ConstLabels: labels,
Buckets: cfg.Buckets,
})
}),
}),
0,
}
}, maxIdleSec),
Cfg: cfg,
}, nil
}
Expand All @@ -57,3 +61,19 @@ func NewHistograms(name, help string, config interface{}) (*Histograms, error) {
func (h *Histograms) With(labels model.LabelSet) prometheus.Histogram {
return h.metricVec.With(labels).(prometheus.Histogram)
}

type expiringHistogram struct {
prometheus.Histogram
lastModSec int64
}

// Observe adds a single observation to the histogram.
func (h *expiringHistogram) Observe(val float64) {
h.Histogram.Observe(val)
h.lastModSec = time.Now().Unix()
}

// HasExpired implements Expireable
func (h *expiringHistogram) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-h.lastModSec >= maxAgeSec
}
38 changes: 38 additions & 0 deletions pkg/logentry/metric/histograms_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package metric

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

func TestHistogramExpiration(t *testing.T) {
t.Parallel()
cfg := HistogramConfig{}

hist, err := NewHistograms("test1", "HELP ME!!!!!", cfg, 1)
assert.Nil(t, err)

// Create a label and increment the histogram
lbl1 := model.LabelSet{}
lbl1["test"] = "app"
hist.With(lbl1).Observe(23)

// Collect the metrics, should still find the metric in the map
collect(hist)
assert.Contains(t, hist.metrics, lbl1.Fingerprint())

time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec

//Add another histogram with new label val
lbl2 := model.LabelSet{}
lbl2["test"] = "app2"
hist.With(lbl2).Observe(2)

// Collect the metrics, first histogram should have expired and removed, second should still be present
collect(hist)
assert.NotContains(t, hist.metrics, lbl1.Fingerprint())
assert.Contains(t, hist.metrics, lbl2.Fingerprint())
}
Loading

0 comments on commit ebab59c

Please sign in to comment.