diff --git a/plugins/aggregators/basicstats/README.md b/plugins/aggregators/basicstats/README.md index 8fef0c6f4886a..f13dd8f375682 100644 --- a/plugins/aggregators/basicstats/README.md +++ b/plugins/aggregators/basicstats/README.md @@ -16,7 +16,7 @@ emitting the aggregate every `period` seconds. drop_original = false ## Configures which basic stats to push as fields - # stats = ["count","diff","min","max","mean","non_negative_diff","stdev","s2","sum"] + # stats = ["count","diff","rate","min","max","mean","non_negative_diff","non_negative_rate","stdev","s2","sum","interval"] ``` - stats @@ -28,13 +28,16 @@ emitting the aggregate every `period` seconds. - measurement1 - field1_count - field1_diff (difference) + - field1_rate (rate per second) - field1_max - field1_min - field1_mean - field1_non_negative_diff (non-negative difference) + - field1_non_negative_rate (non-negative rate per second) - field1_sum - field1_s2 (variance) - field1_stdev (standard deviation) + - field1_interval (interval in nanoseconds) ### Tags: @@ -46,8 +49,8 @@ No tags are applied by this aggregator. $ telegraf --config telegraf.conf --quiet system,host=tars load1=1 1475583980000000000 system,host=tars load1=1 1475583990000000000 -system,host=tars load1_count=2,load1_diff=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000 +system,host=tars load1_count=2,load1_diff=0,load1_rate=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0,load1_interval=10000000000i 1475584010000000000 system,host=tars load1=1 1475584020000000000 system,host=tars load1=3 1475584030000000000 -system,host=tars load1_count=2,load1_diff=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000 +system,host=tars load1_count=2,load1_diff=2,load1_rate=0.2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162,load1_interval=10000000000i 1475584010000000000 ``` diff --git a/plugins/aggregators/basicstats/basicstats.go b/plugins/aggregators/basicstats/basicstats.go index 4e62ee31123a4..67cee50c4609b 100644 --- a/plugins/aggregators/basicstats/basicstats.go +++ b/plugins/aggregators/basicstats/basicstats.go @@ -2,6 +2,7 @@ package basicstats import ( "math" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/aggregators" @@ -25,6 +26,9 @@ type configuredStats struct { sum bool diff bool non_negative_diff bool + rate bool + non_negative_rate bool + interval bool } func NewBasicStats() *BasicStats { @@ -40,14 +44,17 @@ type aggregate struct { } type basicstats struct { - count float64 - min float64 - max float64 - sum float64 - mean float64 - diff float64 - M2 float64 //intermediate value for variance/stdev - LAST float64 //intermediate value for diff + count float64 + min float64 + max float64 + sum float64 + mean float64 + diff float64 + rate float64 + interval time.Duration + M2 float64 //intermediate value for variance/stdev + LAST float64 //intermediate value for diff + TIME time.Time //intermediate value for rate } var sampleConfig = ` @@ -88,8 +95,10 @@ func (b *BasicStats) Add(in telegraf.Metric) { mean: fv, sum: fv, diff: 0.0, + rate: 0.0, M2: 0.0, LAST: fv, + TIME: in.Time(), } } } @@ -100,14 +109,17 @@ func (b *BasicStats) Add(in telegraf.Metric) { if _, ok := b.cache[id].fields[field.Key]; !ok { // hit an uncached field of a cached metric b.cache[id].fields[field.Key] = basicstats{ - count: 1, - min: fv, - max: fv, - mean: fv, - sum: fv, - diff: 0.0, - M2: 0.0, - LAST: fv, + count: 1, + min: fv, + max: fv, + mean: fv, + sum: fv, + diff: 0.0, + rate: 0.0, + interval: 0, + M2: 0.0, + LAST: fv, + TIME: in.Time(), } continue } @@ -138,6 +150,12 @@ func (b *BasicStats) Add(in telegraf.Metric) { tmp.sum += fv //diff compute tmp.diff = fv - tmp.LAST + //interval compute + tmp.interval = in.Time().Sub(tmp.TIME) + //rate compute + if !in.Time().Equal(tmp.TIME) { + tmp.rate = tmp.diff / tmp.interval.Seconds() + } //store final data b.cache[id].fields[field.Key] = tmp } @@ -182,7 +200,15 @@ func (b *BasicStats) Push(acc telegraf.Accumulator) { if b.statsConfig.non_negative_diff && v.diff >= 0 { fields[k+"_non_negative_diff"] = v.diff } - + if b.statsConfig.rate { + fields[k+"_rate"] = v.rate + } + if b.statsConfig.non_negative_rate && v.diff >= 0 { + fields[k+"_non_negative_rate"] = v.rate + } + if b.statsConfig.interval { + fields[k+"_interval"] = v.interval.Nanoseconds() + } } //if count == 1 StdDev = infinite => so I won't send data } @@ -217,7 +243,12 @@ func (b *BasicStats) parseStats() *configuredStats { parsed.diff = true case "non_negative_diff": parsed.non_negative_diff = true - + case "rate": + parsed.rate = true + case "non_negative_rate": + parsed.non_negative_rate = true + case "interval": + parsed.interval = true default: b.Log.Warnf("Unrecognized basic stat %q, ignoring", name) } @@ -237,6 +268,8 @@ func (b *BasicStats) getConfiguredStats() { stdev: true, sum: false, non_negative_diff: false, + rate: false, + non_negative_rate: false, } } else { b.statsConfig = b.parseStats() diff --git a/plugins/aggregators/basicstats/basicstats_test.go b/plugins/aggregators/basicstats/basicstats_test.go index c5a093840abc7..8b2e9c7397872 100644 --- a/plugins/aggregators/basicstats/basicstats_test.go +++ b/plugins/aggregators/basicstats/basicstats_test.go @@ -19,7 +19,7 @@ var m1, _ = metric.New("m1", "d": float64(2), "g": int64(3), }, - time.Now(), + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), ) var m2, _ = metric.New("m1", map[string]string{"foo": "bar"}, @@ -34,7 +34,7 @@ var m2, _ = metric.New("m1", "andme": true, "g": int64(1), }, - time.Now(), + time.Date(2000, 1, 1, 0, 0, 0, 1e6, time.UTC), ) func BenchmarkApply(b *testing.B) { @@ -498,6 +498,81 @@ func TestBasicStatsWithDiff(t *testing.T) { acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) } +func TestBasicStatsWithRate(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"rate"} + aggregator.Log = testutil.Logger{} + aggregator.getConfiguredStats() + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + expectedFields := map[string]interface{}{ + "a_rate": float64(0), + "b_rate": float64(2000), + "c_rate": float64(2000), + "d_rate": float64(4000), + "g_rate": float64(-2000), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +func TestBasicStatsWithNonNegativeRate(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"non_negative_rate"} + aggregator.Log = testutil.Logger{} + aggregator.getConfiguredStats() + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_non_negative_rate": float64(0), + "b_non_negative_rate": float64(2000), + "c_non_negative_rate": float64(2000), + "d_non_negative_rate": float64(4000), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} +func TestBasicStatsWithInterval(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"interval"} + aggregator.Log = testutil.Logger{} + aggregator.getConfiguredStats() + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_interval": int64(time.Millisecond), + "b_interval": int64(time.Millisecond), + "c_interval": int64(time.Millisecond), + "d_interval": int64(time.Millisecond), + "g_interval": int64(time.Millisecond), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + // Test only aggregating non_negative_diff func TestBasicStatsWithNonNegativeDiff(t *testing.T) {