Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate and interval to the basicstats aggregator plugin #8428

Merged
merged 1 commit into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions plugins/aggregators/basicstats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ emitting the aggregate every `period` seconds.
drop_original = false

## Configures which basic stats to push as fields
# stats = ["count","diff","min","max","mean","non_negative_diff","stdev","s2","sum"]
# stats = ["count","diff","rate","min","max","mean","non_negative_diff","non_negative_rate","stdev","s2","sum","interval"]
```

- stats
Expand All @@ -28,13 +28,16 @@ emitting the aggregate every `period` seconds.
- measurement1
- field1_count
- field1_diff (difference)
- field1_rate (rate per second)
- field1_max
- field1_min
- field1_mean
- field1_non_negative_diff (non-negative difference)
- field1_non_negative_rate (non-negative rate per second)
- field1_sum
- field1_s2 (variance)
- field1_stdev (standard deviation)
- field1_interval (interval in nanoseconds)

### Tags:

Expand All @@ -46,8 +49,8 @@ No tags are applied by this aggregator.
$ telegraf --config telegraf.conf --quiet
system,host=tars load1=1 1475583980000000000
system,host=tars load1=1 1475583990000000000
system,host=tars load1_count=2,load1_diff=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1_count=2,load1_diff=0,load1_rate=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0,load1_interval=10000000000i 1475584010000000000
system,host=tars load1=1 1475584020000000000
system,host=tars load1=3 1475584030000000000
system,host=tars load1_count=2,load1_diff=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000
system,host=tars load1_count=2,load1_diff=2,load1_rate=0.2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162,load1_interval=10000000000i 1475584010000000000
```
69 changes: 51 additions & 18 deletions plugins/aggregators/basicstats/basicstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package basicstats

import (
"math"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
Expand All @@ -25,6 +26,9 @@ type configuredStats struct {
sum bool
diff bool
non_negative_diff bool
rate bool
non_negative_rate bool
interval bool
}

func NewBasicStats() *BasicStats {
Expand All @@ -40,14 +44,17 @@ type aggregate struct {
}

type basicstats struct {
count float64
min float64
max float64
sum float64
mean float64
diff float64
M2 float64 //intermediate value for variance/stdev
LAST float64 //intermediate value for diff
count float64
min float64
max float64
sum float64
mean float64
diff float64
rate float64
interval time.Duration
M2 float64 //intermediate value for variance/stdev
LAST float64 //intermediate value for diff
TIME time.Time //intermediate value for rate
}

var sampleConfig = `
Expand Down Expand Up @@ -88,8 +95,10 @@ func (b *BasicStats) Add(in telegraf.Metric) {
mean: fv,
sum: fv,
diff: 0.0,
rate: 0.0,
M2: 0.0,
LAST: fv,
TIME: in.Time(),
}
}
}
Expand All @@ -100,14 +109,17 @@ func (b *BasicStats) Add(in telegraf.Metric) {
if _, ok := b.cache[id].fields[field.Key]; !ok {
// hit an uncached field of a cached metric
b.cache[id].fields[field.Key] = basicstats{
count: 1,
min: fv,
max: fv,
mean: fv,
sum: fv,
diff: 0.0,
M2: 0.0,
LAST: fv,
count: 1,
min: fv,
max: fv,
mean: fv,
sum: fv,
diff: 0.0,
rate: 0.0,
interval: 0,
M2: 0.0,
LAST: fv,
TIME: in.Time(),
}
continue
}
Expand Down Expand Up @@ -138,6 +150,12 @@ func (b *BasicStats) Add(in telegraf.Metric) {
tmp.sum += fv
//diff compute
tmp.diff = fv - tmp.LAST
//interval compute
tmp.interval = in.Time().Sub(tmp.TIME)
//rate compute
if !in.Time().Equal(tmp.TIME) {
tmp.rate = tmp.diff / tmp.interval.Seconds()
}
//store final data
b.cache[id].fields[field.Key] = tmp
}
Expand Down Expand Up @@ -182,7 +200,15 @@ func (b *BasicStats) Push(acc telegraf.Accumulator) {
if b.statsConfig.non_negative_diff && v.diff >= 0 {
fields[k+"_non_negative_diff"] = v.diff
}

if b.statsConfig.rate {
fields[k+"_rate"] = v.rate
}
if b.statsConfig.non_negative_rate && v.diff >= 0 {
fields[k+"_non_negative_rate"] = v.rate
}
if b.statsConfig.interval {
fields[k+"_interval"] = v.interval.Nanoseconds()
}
}
//if count == 1 StdDev = infinite => so I won't send data
}
Expand Down Expand Up @@ -217,7 +243,12 @@ func (b *BasicStats) parseStats() *configuredStats {
parsed.diff = true
case "non_negative_diff":
parsed.non_negative_diff = true

case "rate":
parsed.rate = true
case "non_negative_rate":
parsed.non_negative_rate = true
case "interval":
parsed.interval = true
default:
b.Log.Warnf("Unrecognized basic stat %q, ignoring", name)
}
Expand All @@ -237,6 +268,8 @@ func (b *BasicStats) getConfiguredStats() {
stdev: true,
sum: false,
non_negative_diff: false,
rate: false,
non_negative_rate: false,
}
} else {
b.statsConfig = b.parseStats()
Expand Down
79 changes: 77 additions & 2 deletions plugins/aggregators/basicstats/basicstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var m1, _ = metric.New("m1",
"d": float64(2),
"g": int64(3),
},
time.Now(),
time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
)
var m2, _ = metric.New("m1",
map[string]string{"foo": "bar"},
Expand All @@ -34,7 +34,7 @@ var m2, _ = metric.New("m1",
"andme": true,
"g": int64(1),
},
time.Now(),
time.Date(2000, 1, 1, 0, 0, 0, 1e6, time.UTC),
)

func BenchmarkApply(b *testing.B) {
Expand Down Expand Up @@ -498,6 +498,81 @@ func TestBasicStatsWithDiff(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

func TestBasicStatsWithRate(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"rate"}
aggregator.Log = testutil.Logger{}
aggregator.getConfiguredStats()

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)
expectedFields := map[string]interface{}{
"a_rate": float64(0),
"b_rate": float64(2000),
"c_rate": float64(2000),
"d_rate": float64(4000),
"g_rate": float64(-2000),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

func TestBasicStatsWithNonNegativeRate(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"non_negative_rate"}
aggregator.Log = testutil.Logger{}
aggregator.getConfiguredStats()

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_non_negative_rate": float64(0),
"b_non_negative_rate": float64(2000),
"c_non_negative_rate": float64(2000),
"d_non_negative_rate": float64(4000),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
func TestBasicStatsWithInterval(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"interval"}
aggregator.Log = testutil.Logger{}
aggregator.getConfiguredStats()

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_interval": int64(time.Millisecond),
"b_interval": int64(time.Millisecond),
"c_interval": int64(time.Millisecond),
"d_interval": int64(time.Millisecond),
"g_interval": int64(time.Millisecond),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating non_negative_diff
func TestBasicStatsWithNonNegativeDiff(t *testing.T) {

Expand Down