Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable Max TTL duration for statsd input plugin entries #8509

Merged
merged 2 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
## Maximum socket buffer size in bytes, once the buffer fills up, metrics
## will start dropping. Defaults to the OS default.
# read_buffer_size = 65535

## Max duration (TTL) for each metric to stay cached/reported without being updated.
# max_ttl = "10h"
```

### Description
Expand Down Expand Up @@ -192,6 +195,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time.
measurements and tags.
- **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **max_ttl** config.Duration: Max duration (TTL) for each metric to stay cached/reported without being updated.

### Statsd bucket -> InfluxDB line-protocol Templates

Expand Down
121 changes: 87 additions & 34 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
Expand Down Expand Up @@ -117,6 +118,9 @@ type Statsd struct {
TCPKeepAlive bool `toml:"tcp_keep_alive"`
TCPKeepAlivePeriod *internal.Duration `toml:"tcp_keep_alive_period"`

// Max duration for each metric to stay cached without being updated.
MaxTTL config.Duration `toml:"max_ttl"`

graphiteParser *graphite.GraphiteParser

acc telegraf.Accumulator
Expand All @@ -131,7 +135,7 @@ type Statsd struct {
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat

Log telegraf.Logger
Log telegraf.Logger `toml:"-"`

// A pool of byte slices to handle parsing
bufPool sync.Pool
Expand Down Expand Up @@ -159,27 +163,31 @@ type metric struct {
}

type cachedset struct {
name string
fields map[string]map[string]bool
tags map[string]string
name string
fields map[string]map[string]bool
tags map[string]string
expiresAt time.Time
}

type cachedgauge struct {
name string
fields map[string]interface{}
tags map[string]string
name string
fields map[string]interface{}
tags map[string]string
expiresAt time.Time
}

type cachedcounter struct {
name string
fields map[string]interface{}
tags map[string]string
name string
fields map[string]interface{}
tags map[string]string
expiresAt time.Time
}

type cachedtimings struct {
name string
fields map[string]RunningStats
tags map[string]string
name string
fields map[string]RunningStats
tags map[string]string
expiresAt time.Time
}

func (_ *Statsd) Description() string {
Expand Down Expand Up @@ -243,6 +251,9 @@ const sampleConfig = `
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000

## Max duration (TTL) for each metric to stay cached/reported without being updated.
#max_ttl = "1000h"
`

func (_ *Statsd) SampleConfig() string {
Expand Down Expand Up @@ -306,6 +317,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
if s.DeleteSets {
s.sets = make(map[string]cachedset)
}

s.expireCachedMetrics()

return nil
}

Expand Down Expand Up @@ -527,9 +541,6 @@ func (s *Statsd) parser() error {
// parseStatsdLine will parse the given statsd line, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()

lineTags := make(map[string]string)
if s.DataDogExtensions {
recombinedSegments := make([]string, 0)
Expand Down Expand Up @@ -734,6 +745,9 @@ func parseKeyValue(keyvalue string) (string, string) {
// aggregates and caches the current value(s). It does not deal with the
// Delete* options, because those are dealt with in the Gather function.
func (s *Statsd) aggregate(m metric) {
s.Lock()
defer s.Unlock()

switch m.mtype {
case "ms", "h":
// Check if the measurement exists
Expand Down Expand Up @@ -761,61 +775,67 @@ func (s *Statsd) aggregate(m metric) {
field.AddValue(m.floatvalue)
}
cached.fields[m.field] = field
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
_, ok := s.counters[m.hash]
cached, ok := s.counters[m.hash]
if !ok {
s.counters[m.hash] = cachedcounter{
cached = cachedcounter{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
// check if the field exists
_, ok = s.counters[m.hash].fields[m.field]
_, ok = cached.fields[m.field]
if !ok {
s.counters[m.hash].fields[m.field] = int64(0)
cached.fields[m.field] = int64(0)
}
s.counters[m.hash].fields[m.field] =
s.counters[m.hash].fields[m.field].(int64) + m.intvalue
cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.counters[m.hash] = cached
case "g":
// check if the measurement exists
_, ok := s.gauges[m.hash]
cached, ok := s.gauges[m.hash]
if !ok {
s.gauges[m.hash] = cachedgauge{
cached = cachedgauge{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
// check if the field exists
_, ok = s.gauges[m.hash].fields[m.field]
_, ok = cached.fields[m.field]
if !ok {
s.gauges[m.hash].fields[m.field] = float64(0)
cached.fields[m.field] = float64(0)
}
if m.additive {
s.gauges[m.hash].fields[m.field] =
s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue
cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue
} else {
s.gauges[m.hash].fields[m.field] = m.floatvalue
cached.fields[m.field] = m.floatvalue
}

cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.gauges[m.hash] = cached
case "s":
// check if the measurement exists
_, ok := s.sets[m.hash]
cached, ok := s.sets[m.hash]
if !ok {
s.sets[m.hash] = cachedset{
cached = cachedset{
name: m.name,
fields: make(map[string]map[string]bool),
tags: m.tags,
}
}
// check if the field exists
_, ok = s.sets[m.hash].fields[m.field]
_, ok = cached.fields[m.field]
if !ok {
s.sets[m.hash].fields[m.field] = make(map[string]bool)
cached.fields[m.field] = make(map[string]bool)
}
s.sets[m.hash].fields[m.field][m.strvalue] = true
cached.fields[m.field][m.strvalue] = true
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.sets[m.hash] = cached
}
}

Expand Down Expand Up @@ -932,6 +952,39 @@ func (s *Statsd) isUDP() bool {
return strings.HasPrefix(s.Protocol, "udp")
}

func (s *Statsd) expireCachedMetrics() {
// If Max TTL wasn't configured, skip expiration.
if s.MaxTTL == 0 {
return
}

now := time.Now()

for key, cached := range s.gauges {
if now.After(cached.expiresAt) {
delete(s.gauges, key)
}
}

for key, cached := range s.sets {
if now.After(cached.expiresAt) {
delete(s.sets, key)
}
}

for key, cached := range s.timings {
if now.After(cached.expiresAt) {
delete(s.timings, key)
}
}

for key, cached := range s.counters {
if now.After(cached.expiresAt) {
delete(s.counters, key)
}
}
}

func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
Expand Down
69 changes: 65 additions & 4 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package statsd

import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net"
"sync"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -1077,6 +1079,65 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
}
}

// Test that the metric caches expire (clear) an entry after the entry hasn't been updated for the configurable MaxTTL duration.
func TestCachesExpireAfterMaxTTL(t *testing.T) {
s := NewTestStatsd()
s.MaxTTL = config.Duration(100 * time.Microsecond)

acc := &testutil.Accumulator{}
s.parseStatsdLine("valid:45|c")
s.parseStatsdLine("valid:45|c")
require.NoError(t, s.Gather(acc))

// Max TTL goes by, our 'valid' entry is cleared.
time.Sleep(100 * time.Microsecond)
require.NoError(t, s.Gather(acc))

// Now when we gather, we should have a counter that is reset to zero.
s.parseStatsdLine("valid:45|c")
require.NoError(t, s.Gather(acc))

testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
testutil.MustMetric(
"valid",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 90,
},
time.Now(),
telegraf.Counter,
),
testutil.MustMetric(
"valid",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 90,
},
time.Now(),
telegraf.Counter,
),
testutil.MustMetric(
"valid",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 45,
},
time.Now(),
telegraf.Counter,
),
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}

// Test that measurements with multiple bits, are treated as different outputs
// but are equal to their single-measurement representation
func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
Expand Down