diff --git a/docs/clients/promtail/stages/metrics.md b/docs/clients/promtail/stages/metrics.md index 6e45565bcae4d..a40c5938c01dc 100644 --- a/docs/clients/promtail/stages/metrics.md +++ b/docs/clients/promtail/stages/metrics.md @@ -70,6 +70,13 @@ type: Gauge # defaulting to the metric's name if not present. [source: ] +# Label values on metrics are dynamic which can cause exported metrics +# to go stale (for example when a stream stops receiving logs). +# To prevent unbounded growth of the /metrics endpoint any metrics which +# have not been updated within this time will be removed. +# Must be greater than or equal to '1s', if undefined default is '5m' +[max_idle_duration: ] + config: # Filters down source data and only changes the metric # if the targeted value exactly matches the provided string. diff --git a/pkg/logentry/metric/counters.go b/pkg/logentry/metric/counters.go index 2eedeee40ea02..d6a02cfbb443e 100644 --- a/pkg/logentry/metric/counters.go +++ b/pkg/logentry/metric/counters.go @@ -2,6 +2,7 @@ package metric import ( "strings" + "time" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -54,7 +55,7 @@ type Counters struct { } // NewCounters creates a new counter vec. -func NewCounters(name, help string, config interface{}) (*Counters, error) { +func NewCounters(name, help string, config interface{}, maxIdleSec int64) (*Counters, error) { cfg, err := parseCounterConfig(config) if err != nil { return nil, err @@ -65,12 +66,14 @@ func NewCounters(name, help string, config interface{}) (*Counters, error) { } return &Counters{ metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { - return prometheus.NewCounter(prometheus.CounterOpts{ + return &expiringCounter{prometheus.NewCounter(prometheus.CounterOpts{ Help: help, Name: name, ConstLabels: labels, - }) - }), + }), + 0, + } + }, maxIdleSec), Cfg: cfg, }, nil } @@ -79,3 +82,27 @@ func NewCounters(name, help string, config interface{}) (*Counters, error) { func (c *Counters) With(labels model.LabelSet) prometheus.Counter { return c.metricVec.With(labels).(prometheus.Counter) } + +type expiringCounter struct { + prometheus.Counter + lastModSec int64 +} + +// Inc increments the counter by 1. Use Add to increment it by arbitrary +// non-negative values. +func (e *expiringCounter) Inc() { + e.Counter.Inc() + e.lastModSec = time.Now().Unix() +} + +// Add adds the given value to the counter. It panics if the value is < +// 0. +func (e *expiringCounter) Add(val float64) { + e.Counter.Add(val) + e.lastModSec = time.Now().Unix() +} + +// HasExpired implements Expireable +func (e *expiringCounter) HasExpired(currentTimeSec int64, maxAgeSec int64) bool { + return currentTimeSec-e.lastModSec >= maxAgeSec +} diff --git a/pkg/logentry/metric/counters_test.go b/pkg/logentry/metric/counters_test.go index b4d5d80f4e7bf..681dd0f959aba 100644 --- a/pkg/logentry/metric/counters_test.go +++ b/pkg/logentry/metric/counters_test.go @@ -2,8 +2,12 @@ package metric import ( "testing" + "time" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" ) var ( @@ -13,6 +17,7 @@ var ( ) func Test_validateCounterConfig(t *testing.T) { + t.Parallel() tests := []struct { name string config CounterConfig @@ -60,3 +65,52 @@ func Test_validateCounterConfig(t *testing.T) { }) } } + +func TestCounterExpiration(t *testing.T) { + t.Parallel() + cfg := CounterConfig{ + Action: "inc", + } + + cnt, err := NewCounters("test1", "HELP ME!!!!!", cfg, 1) + assert.Nil(t, err) + + // Create a label and increment the counter + lbl1 := model.LabelSet{} + lbl1["test"] = "i don't wanna make this a constant" + cnt.With(lbl1).Inc() + + // Collect the metrics, should still find the metric in the map + collect(cnt) + assert.Contains(t, cnt.metrics, lbl1.Fingerprint()) + + time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec + + //Add another counter with new label val + lbl2 := model.LabelSet{} + lbl2["test"] = "eat this linter" + cnt.With(lbl2).Inc() + + // Collect the metrics, first counter should have expired and removed, second should still be present + collect(cnt) + assert.NotContains(t, cnt.metrics, lbl1.Fingerprint()) + assert.Contains(t, cnt.metrics, lbl2.Fingerprint()) +} + +func collect(c prometheus.Collector) { + done := make(chan struct{}) + collector := make(chan prometheus.Metric) + + go func() { + defer close(done) + c.Collect(collector) + }() + + for { + select { + case <-collector: + case <-done: + return + } + } +} diff --git a/pkg/logentry/metric/gauges.go b/pkg/logentry/metric/gauges.go index a9490f2515dc1..c136658eada38 100644 --- a/pkg/logentry/metric/gauges.go +++ b/pkg/logentry/metric/gauges.go @@ -2,6 +2,7 @@ package metric import ( "strings" + "time" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -56,7 +57,7 @@ type Gauges struct { } // NewGauges creates a new gauge vec. -func NewGauges(name, help string, config interface{}) (*Gauges, error) { +func NewGauges(name, help string, config interface{}, maxIdleSec int64) (*Gauges, error) { cfg, err := parseGaugeConfig(config) if err != nil { return nil, err @@ -67,12 +68,14 @@ func NewGauges(name, help string, config interface{}) (*Gauges, error) { } return &Gauges{ metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { - return prometheus.NewGauge(prometheus.GaugeOpts{ + return &expiringGauge{prometheus.NewGauge(prometheus.GaugeOpts{ Help: help, Name: name, ConstLabels: labels, - }) - }), + }), + 0, + } + }, maxIdleSec), Cfg: cfg, }, nil } @@ -81,3 +84,53 @@ func NewGauges(name, help string, config interface{}) (*Gauges, error) { func (g *Gauges) With(labels model.LabelSet) prometheus.Gauge { return g.metricVec.With(labels).(prometheus.Gauge) } + +type expiringGauge struct { + prometheus.Gauge + lastModSec int64 +} + +// Set sets the Gauge to an arbitrary value. +func (g *expiringGauge) Set(val float64) { + g.Gauge.Set(val) + g.lastModSec = time.Now().Unix() +} + +// Inc increments the Gauge by 1. Use Add to increment it by arbitrary +// values. +func (g *expiringGauge) Inc() { + g.Gauge.Inc() + g.lastModSec = time.Now().Unix() +} + +// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary +// values. +func (g *expiringGauge) Dec() { + g.Gauge.Dec() + g.lastModSec = time.Now().Unix() +} + +// Add adds the given value to the Gauge. (The value can be negative, +// resulting in a decrease of the Gauge.) +func (g *expiringGauge) Add(val float64) { + g.Gauge.Add(val) + g.lastModSec = time.Now().Unix() +} + +// Sub subtracts the given value from the Gauge. (The value can be +// negative, resulting in an increase of the Gauge.) +func (g *expiringGauge) Sub(val float64) { + g.Gauge.Sub(val) + g.lastModSec = time.Now().Unix() +} + +// SetToCurrentTime sets the Gauge to the current Unix time in seconds. +func (g *expiringGauge) SetToCurrentTime() { + g.Gauge.SetToCurrentTime() + g.lastModSec = time.Now().Unix() +} + +// HasExpired implements Expireable +func (g *expiringGauge) HasExpired(currentTimeSec int64, maxAgeSec int64) bool { + return currentTimeSec-g.lastModSec >= maxAgeSec +} diff --git a/pkg/logentry/metric/gauges_test.go b/pkg/logentry/metric/gauges_test.go new file mode 100644 index 0000000000000..10eb17240edbc --- /dev/null +++ b/pkg/logentry/metric/gauges_test.go @@ -0,0 +1,40 @@ +package metric + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" +) + +func TestGaugeExpiration(t *testing.T) { + t.Parallel() + cfg := GaugeConfig{ + Action: "inc", + } + + gag, err := NewGauges("test1", "HELP ME!!!!!", cfg, 1) + assert.Nil(t, err) + + // Create a label and increment the gauge + lbl1 := model.LabelSet{} + lbl1["test"] = "app" + gag.With(lbl1).Inc() + + // Collect the metrics, should still find the metric in the map + collect(gag) + assert.Contains(t, gag.metrics, lbl1.Fingerprint()) + + time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec + + //Add another gauge with new label val + lbl2 := model.LabelSet{} + lbl2["test"] = "app2" + gag.With(lbl2).Inc() + + // Collect the metrics, first gauge should have expired and removed, second should still be present + collect(gag) + assert.NotContains(t, gag.metrics, lbl1.Fingerprint()) + assert.Contains(t, gag.metrics, lbl2.Fingerprint()) +} diff --git a/pkg/logentry/metric/histograms.go b/pkg/logentry/metric/histograms.go index b609ea158f3e8..9721a76905727 100644 --- a/pkg/logentry/metric/histograms.go +++ b/pkg/logentry/metric/histograms.go @@ -1,6 +1,8 @@ package metric import ( + "time" + "github.com/mitchellh/mapstructure" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -31,7 +33,7 @@ type Histograms struct { } // NewHistograms creates a new histogram vec. -func NewHistograms(name, help string, config interface{}) (*Histograms, error) { +func NewHistograms(name, help string, config interface{}, maxIdleSec int64) (*Histograms, error) { cfg, err := parseHistogramConfig(config) if err != nil { return nil, err @@ -42,13 +44,15 @@ func NewHistograms(name, help string, config interface{}) (*Histograms, error) { } return &Histograms{ metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { - return prometheus.NewHistogram(prometheus.HistogramOpts{ + return &expiringHistogram{prometheus.NewHistogram(prometheus.HistogramOpts{ Help: help, Name: name, ConstLabels: labels, Buckets: cfg.Buckets, - }) - }), + }), + 0, + } + }, maxIdleSec), Cfg: cfg, }, nil } @@ -57,3 +61,19 @@ func NewHistograms(name, help string, config interface{}) (*Histograms, error) { func (h *Histograms) With(labels model.LabelSet) prometheus.Histogram { return h.metricVec.With(labels).(prometheus.Histogram) } + +type expiringHistogram struct { + prometheus.Histogram + lastModSec int64 +} + +// Observe adds a single observation to the histogram. +func (h *expiringHistogram) Observe(val float64) { + h.Histogram.Observe(val) + h.lastModSec = time.Now().Unix() +} + +// HasExpired implements Expireable +func (h *expiringHistogram) HasExpired(currentTimeSec int64, maxAgeSec int64) bool { + return currentTimeSec-h.lastModSec >= maxAgeSec +} diff --git a/pkg/logentry/metric/histograms_test.go b/pkg/logentry/metric/histograms_test.go new file mode 100644 index 0000000000000..ffb18cded0bef --- /dev/null +++ b/pkg/logentry/metric/histograms_test.go @@ -0,0 +1,38 @@ +package metric + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" +) + +func TestHistogramExpiration(t *testing.T) { + t.Parallel() + cfg := HistogramConfig{} + + hist, err := NewHistograms("test1", "HELP ME!!!!!", cfg, 1) + assert.Nil(t, err) + + // Create a label and increment the histogram + lbl1 := model.LabelSet{} + lbl1["test"] = "app" + hist.With(lbl1).Observe(23) + + // Collect the metrics, should still find the metric in the map + collect(hist) + assert.Contains(t, hist.metrics, lbl1.Fingerprint()) + + time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec + + //Add another histogram with new label val + lbl2 := model.LabelSet{} + lbl2["test"] = "app2" + hist.With(lbl2).Observe(2) + + // Collect the metrics, first histogram should have expired and removed, second should still be present + collect(hist) + assert.NotContains(t, hist.metrics, lbl1.Fingerprint()) + assert.Contains(t, hist.metrics, lbl2.Fingerprint()) +} diff --git a/pkg/logentry/metric/metricvec.go b/pkg/logentry/metric/metricvec.go index 964ddf8ce269f..8ca2f778f9df9 100644 --- a/pkg/logentry/metric/metricvec.go +++ b/pkg/logentry/metric/metricvec.go @@ -2,6 +2,7 @@ package metric import ( "sync" + "time" "github.com/grafana/loki/pkg/util" @@ -9,16 +10,23 @@ import ( "github.com/prometheus/common/model" ) +// Expireable allows checking if something has exceeded the provided maxAge based on the provided currentTime +type Expireable interface { + HasExpired(currentTimeSec int64, maxAgeSec int64) bool +} + type metricVec struct { - factory func(labels map[string]string) prometheus.Metric - mtx sync.Mutex - metrics map[model.Fingerprint]prometheus.Metric + factory func(labels map[string]string) prometheus.Metric + mtx sync.Mutex + metrics map[model.Fingerprint]prometheus.Metric + maxAgeSec int64 } -func newMetricVec(factory func(labels map[string]string) prometheus.Metric) *metricVec { +func newMetricVec(factory func(labels map[string]string) prometheus.Metric, maxAgeSec int64) *metricVec { return &metricVec{ - metrics: map[model.Fingerprint]prometheus.Metric{}, - factory: factory, + metrics: map[model.Fingerprint]prometheus.Metric{}, + factory: factory, + maxAgeSec: maxAgeSec, } } @@ -33,6 +41,7 @@ func (c *metricVec) Collect(ch chan<- prometheus.Metric) { for _, m := range c.metrics { ch <- m } + c.prune() } // With returns the metric associated with the labelset. @@ -48,3 +57,16 @@ func (c *metricVec) With(labels model.LabelSet) prometheus.Metric { } return metric } + +// prune will remove all metrics which implement the Expireable interface and have expired +// it does not take out a lock on the metrics map so whoever calls this function should do so. +func (c *metricVec) prune() { + currentTimeSec := time.Now().Unix() + for fp, m := range c.metrics { + if em, ok := m.(Expireable); ok { + if em.HasExpired(currentTimeSec, c.maxAgeSec) { + delete(c.metrics, fp) + } + } + } +} diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go index 15d3c584c1f2b..9eb9091bdbd62 100644 --- a/pkg/logentry/stages/metrics.go +++ b/pkg/logentry/stages/metrics.go @@ -25,15 +25,19 @@ const ( ErrEmptyMetricsStageConfig = "empty metric stage configuration" ErrMetricsStageInvalidType = "invalid metric type '%s', metric type must be one of 'counter', 'gauge', or 'histogram'" + ErrInvalidIdleDur = "max_idle_duration could not be parsed as a time.Duration: '%s'" + ErrSubSecIdleDur = "max_idle_duration less than 1s not allowed" ) // MetricConfig is a single metrics configuration. type MetricConfig struct { - MetricType string `mapstructure:"type"` - Description string `mapstructure:"description"` - Source *string `mapstructure:"source"` - Prefix string `mapstructure:"prefix"` - Config interface{} `mapstructure:"config"` + MetricType string `mapstructure:"type"` + Description string `mapstructure:"description"` + Source *string `mapstructure:"source"` + Prefix string `mapstructure:"prefix"` + IdleDuration *string `mapstructure:"max_idle_duration"` + maxIdleSec int64 + Config interface{} `mapstructure:"config"` } // MetricsConfig is a set of configured metrics. @@ -46,7 +50,7 @@ func validateMetricsConfig(cfg MetricsConfig) error { for name, config := range cfg { //If the source is not defined, default to the metric name if config.Source == nil { - cp := config + cp := cfg[name] nm := name cp.Source = &nm cfg[name] = cp @@ -58,6 +62,24 @@ func validateMetricsConfig(cfg MetricsConfig) error { config.MetricType != MetricTypeHistogram { return errors.Errorf(ErrMetricsStageInvalidType, config.MetricType) } + + // Set the idle duration for metrics + if config.IdleDuration != nil { + d, err := time.ParseDuration(*config.IdleDuration) + if err != nil { + return errors.Errorf(ErrInvalidIdleDur, err) + } + if d < 1*time.Second { + return errors.New(ErrSubSecIdleDur) + } + cp := cfg[name] + cp.maxIdleSec = int64(d.Seconds()) + cfg[name] = cp + } else { + cp := cfg[name] + cp.maxIdleSec = int64(5 * time.Minute.Seconds()) + cfg[name] = cp + } } return nil } @@ -86,17 +108,17 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R switch strings.ToLower(cfg.MetricType) { case MetricTypeCounter: - collector, err = metric.NewCounters(customPrefix+name, cfg.Description, cfg.Config) + collector, err = metric.NewCounters(customPrefix+name, cfg.Description, cfg.Config, cfg.maxIdleSec) if err != nil { return nil, err } case MetricTypeGauge: - collector, err = metric.NewGauges(customPrefix+name, cfg.Description, cfg.Config) + collector, err = metric.NewGauges(customPrefix+name, cfg.Description, cfg.Config, cfg.maxIdleSec) if err != nil { return nil, err } case MetricTypeHistogram: - collector, err = metric.NewHistograms(customPrefix+name, cfg.Description, cfg.Config) + collector, err = metric.NewHistograms(customPrefix+name, cfg.Description, cfg.Config, cfg.maxIdleSec) if err != nil { return nil, err } diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go index aef1ae22c76b9..9ea7eccafbaeb 100644 --- a/pkg/logentry/stages/metrics_test.go +++ b/pkg/logentry/stages/metrics_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" "github.com/grafana/loki/pkg/logentry/metric" ) @@ -108,6 +109,8 @@ func TestMetricsPipeline(t *testing.T) { } } +var metricTestInvalidIdle = "10f" + func Test(t *testing.T) { tests := map[string]struct { config MetricsConfig @@ -125,6 +128,15 @@ func Test(t *testing.T) { }, errors.Errorf(ErrMetricsStageInvalidType, "piplne"), }, + "invalid idle duration": { + MetricsConfig{ + "metric1": MetricConfig{ + MetricType: "Counter", + IdleDuration: &metricTestInvalidIdle, + }, + }, + errors.Errorf(ErrInvalidIdleDur, "time: unknown unit f in duration 10f"), + }, "valid": { MetricsConfig{ "metric1": MetricConfig{ @@ -152,6 +164,24 @@ func Test(t *testing.T) { } } +func TestDefaultIdleDuration(t *testing.T) { + registry := prometheus.NewRegistry() + metricsConfig := MetricsConfig{ + "total_keys": MetricConfig{ + MetricType: "Counter", + Description: "the total keys per doc", + Config: metric.CounterConfig{ + Action: metric.CounterAdd, + }, + }, + } + ms, err := New(util.Logger, nil, StageTypeMetric, metricsConfig, registry) + if err != nil { + t.Fatalf("failed to create stage with metrics: %v", err) + } + assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec) +} + var labelFoo = model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar", "bar": "foo"}) var labelFu = model.LabelSet(map[model.LabelName]model.LabelValue{"fu": "baz", "baz": "fu"})