diff --git a/metric.go b/metric.go index cb7de0fcea003..cc9235c4b2a3d 100644 --- a/metric.go +++ b/metric.go @@ -142,3 +142,9 @@ type UnwrappableMetric interface { // wraps it in the first place. Unwrap() Metric } + +type TrackingMetric interface { + // TrackingID returns the ID used for tracking the metric + TrackingID() TrackingID + UnwrappableMetric +} diff --git a/metric/tracking.go b/metric/tracking.go index c7672f30f89b8..50f11c74d6dae 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -150,6 +150,11 @@ func (m *trackingMetric) decr() { } } +// Unwrap allows to access the underlying metric directly e.g. for go-templates +func (m *trackingMetric) TrackingID() telegraf.TrackingID { + return m.d.id +} + // Unwrap allows to access the underlying metric directly e.g. for go-templates func (m *trackingMetric) Unwrap() telegraf.Metric { return m.Metric diff --git a/plugins/common/starlark/builtins.go b/plugins/common/starlark/builtins.go index 3845e18550985..1a50bdae060ef 100644 --- a/plugins/common/starlark/builtins.go +++ b/plugins/common/starlark/builtins.go @@ -7,6 +7,7 @@ import ( "go.starlark.net/starlark" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) @@ -49,13 +50,20 @@ func items(value starlark.Value, errorMsg string) ([]starlark.Tuple, error) { func deepcopy(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { var sm *Metric - if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil { + var track bool + if err := starlark.UnpackArgs("deepcopy", args, kwargs, "source", &sm, "track?", &track); err != nil { return nil, err } - dup := sm.metric.Copy() - dup.Drop() - return &Metric{metric: dup}, nil + // In case we copy a tracking metric but do not want to track the result, + // we have to strip the tracking information. This can be done by unwrapping + // the metric. + if tm, ok := sm.metric.(telegraf.TrackingMetric); ok && !track { + return &Metric{metric: tm.Unwrap().Copy()}, nil + } + + // Copy the whole metric including potential tracking information + return &Metric{metric: sm.metric.Copy()}, nil } // catch(f) evaluates f() and returns its evaluation error message diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 61744cbdf6168..cbf67d7b75ee4 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -2024,38 +2024,34 @@ func TestParse_DeltaCounter(t *testing.T) { require.Eventuallyf(t, func() bool { require.NoError(t, statsd.Gather(acc)) - acc.Lock() - defer acc.Unlock() - - fmt.Println(acc.NMetrics()) - expected := []telegraf.Metric{ - testutil.MustMetric( - "cpu_time_idle", - map[string]string{ - "metric_type": "counter", - "temporality": "delta", - }, - map[string]interface{}{ - "value": 42, - }, - time.Now(), - telegraf.Counter, - ), - } - got := acc.GetTelegrafMetrics() - testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time")) + return acc.NMetrics() >= 1 + }, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics()) - startTime, ok := got[0].GetField("start_time") - require.True(t, ok, "expected start_time field") + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "metric_type": "counter", + "temporality": "delta", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + telegraf.Counter, + ), + } + got := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time")) - startTimeStr, ok := startTime.(string) - require.True(t, ok, "expected start_time field to be a string") + startTime, ok := got[0].GetField("start_time") + require.True(t, ok, "expected start_time field") - _, err = time.Parse(time.RFC3339, startTimeStr) - require.NoError(t, err, "execpted start_time field to be in RFC3339 format") + startTimeStr, ok := startTime.(string) + require.True(t, ok, "expected start_time field to be a string") - return acc.NMetrics() >= 1 - }, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics()) + _, err = time.Parse(time.RFC3339, startTimeStr) + require.NoError(t, err, "execpted start_time field to be in RFC3339 format") require.NoError(t, conn.Close()) } diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 052515b7c2bea..5546a3591f375 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -356,17 +356,7 @@ cpu,42 return &parser, err }) - err = plugin.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} - err = plugin.Start(&acc) - require.NoError(t, err) - defer plugin.Stop() - err = plugin.Gather(&acc) - require.NoError(t, err) - acc.Wait(2) - plugin.Stop() + require.NoError(t, plugin.Init()) expected := []telegraf.Metric{ testutil.MustMetric("cpu", @@ -386,6 +376,15 @@ cpu,42 }, time.Unix(0, 0)), } + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + require.NoError(t, plugin.Gather(&acc)) + require.Eventually(t, func() bool { + return acc.NFields() >= len(expected) + }, 3*time.Second, 100*time.Millisecond) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } diff --git a/plugins/processors/starlark/README.md b/plugins/processors/starlark/README.md index dcd04b476f15e..785f09c08b262 100644 --- a/plugins/processors/starlark/README.md +++ b/plugins/processors/starlark/README.md @@ -84,7 +84,12 @@ of type int, float, string, or bool. The timestamp of the metric as an integer in nanoseconds since the Unix epoch. -- **deepcopy(*metric*)**: Make a copy of an existing metric. +- **deepcopy(*metric*, *track=false*)**: +Copy an existing metric with or without tracking information. If `track` is set +to `true`, the tracking information is copied. +**Caution:** Make sure to always return *all* metrics with tracking information! +Otherwise, the corresponding inputs will never receive the delivery information +and potentially overrun! ### Python Differences diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index 3e3cb1e63410b..786c60aef8b11 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -79,7 +79,6 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err // the original metric to the accumulator if v.ID == origMetric.HashID() { origFound = true - m.Accept() s.results = append(s.results, origMetric) acc.AddMetric(origMetric) continue @@ -108,7 +107,6 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err // If we got the original metric back, use that and drop the new one. // Otherwise mark the original as accepted and use the new metric. if origMetric.HashID() == rv.ID { - m.Accept() acc.AddMetric(origMetric) } else { origMetric.Accept() diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index 049519e2a1413..831b08cc036e8 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -3400,6 +3400,32 @@ def apply(metric): newmetric = Metric("new_metric") newmetric.fields["vaue"] = 42 return [metric, newmetric] +`, + }, + { + name: "return original and deep-copy", + numMetrics: 2, + source: ` +def apply(metric): + return [metric, deepcopy(metric, track=True)] +`, + }, + { + name: "deep-copy but do not return", + numMetrics: 1, + source: ` +def apply(metric): + x = deepcopy(metric) + return [metric] +`, + }, + { + name: "deep-copy but do not return original metric", + numMetrics: 1, + source: ` +def apply(metric): + x = deepcopy(metric, track=True) + return [x] `, }, } @@ -3426,8 +3452,9 @@ def apply(metric): plugin.Stop() // Ensure we get back the correct number of metrics - require.Len(t, acc.GetTelegrafMetrics(), tt.numMetrics) - for _, m := range acc.GetTelegrafMetrics() { + actual := acc.GetTelegrafMetrics() + require.Lenf(t, actual, tt.numMetrics, "expected %d metrics but got %d", tt.numMetrics, len(actual)) + for _, m := range actual { m.Accept() } diff --git a/plugins/processors/starlark/testdata/value_filter.star b/plugins/processors/starlark/testdata/value_filter.star index a4ceb28a68a72..997bcb6bc67ef 100644 --- a/plugins/processors/starlark/testdata/value_filter.star +++ b/plugins/processors/starlark/testdata/value_filter.star @@ -1,7 +1,7 @@ # Filter metrics by value ''' In this example we look at the `value` field of the metric. -If the value is zeor, we delete all the fields, effectively dropping the metric. +If the value is zero, we delete all the fields, effectively dropping the metric. Example Input: temperature sensor="001A0",value=111.48 1618488000000000999 diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 05b97d0f6090e..050ede86404ab 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -12,17 +12,9 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) -var ( - lastID uint64 -) - -func newTrackingID() telegraf.TrackingID { - id := atomic.AddUint64(&lastID, 1) - return telegraf.TrackingID(id) -} - // Metric defines a single point measurement type Metric struct { Measurement string @@ -38,33 +30,49 @@ func (p *Metric) String() string { // Accumulator defines a mocked out accumulator type Accumulator struct { - sync.Mutex - *sync.Cond - - Metrics []*Metric - nMetrics uint64 - Discard bool - Errors []error - debug bool - delivered chan telegraf.DeliveryInfo + nMetrics uint64 // Needs to be first to avoid unaligned atomic operations on 32-bit archs + Metrics []*Metric + accumulated []telegraf.Metric + Discard bool + Errors []error + debug bool + deliverChan chan telegraf.DeliveryInfo + delivered []telegraf.DeliveryInfo TimeFunc func() time.Time + + sync.Mutex + *sync.Cond } func (a *Accumulator) NMetrics() uint64 { return atomic.LoadUint64(&a.nMetrics) } +func (a *Accumulator) NDelivered() int { + a.Lock() + defer a.Unlock() + return len(a.delivered) +} + // GetTelegrafMetrics returns all the metrics collected by the accumulator // If you are getting race conditions here then you are not waiting for all of your metrics to arrive: see Wait() func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric { - metrics := []telegraf.Metric{} - for _, m := range a.Metrics { - metrics = append(metrics, FromTestMetric(m)) - } + a.Lock() + defer a.Unlock() + metrics := make([]telegraf.Metric, 0, len(a.accumulated)) + metrics = append(metrics, a.accumulated...) return metrics } +func (a *Accumulator) GetDeliveries() []telegraf.DeliveryInfo { + a.Lock() + defer a.Unlock() + info := make([]telegraf.DeliveryInfo, 0, len(a.delivered)) + info = append(info, a.delivered...) + return info +} + func (a *Accumulator) FirstError() error { if len(a.Errors) == 0 { return nil @@ -77,6 +85,7 @@ func (a *Accumulator) ClearMetrics() { defer a.Unlock() atomic.StoreUint64(&a.nMetrics, 0) a.Metrics = make([]*Metric, 0) + a.accumulated = make([]telegraf.Metric, 0) } func (a *Accumulator) addMeasurement( @@ -129,7 +138,7 @@ func (a *Accumulator) addMeasurement( fmt.Print(msg) } - p := &Metric{ + m := &Metric{ Measurement: measurement, Fields: fieldsCopy, Tags: tagsCopy, @@ -137,7 +146,8 @@ func (a *Accumulator) addMeasurement( Type: tp, } - a.Metrics = append(a.Metrics, p) + a.Metrics = append(a.Metrics, m) + a.accumulated = append(a.accumulated, FromTestMetric(m)) } // AddFields adds a measurement point with a specified timestamp. @@ -170,7 +180,7 @@ func (a *Accumulator) AddGauge( func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { for _, m := range metrics { - a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) + a.AddMetric(m) } } @@ -193,32 +203,57 @@ func (a *Accumulator) AddHistogram( } func (a *Accumulator) AddMetric(m telegraf.Metric) { - a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) + a.Lock() + defer a.Unlock() + atomic.AddUint64(&a.nMetrics, 1) + if a.Cond != nil { + a.Cond.Broadcast() + } + if a.Discard { + return + } + + // Drop metrics without fields + if len(m.FieldList()) == 0 { + return + } + + a.Metrics = append(a.Metrics, ToTestMetric(m)) + a.accumulated = append(a.accumulated, m) } -func (a *Accumulator) WithTracking(_ int) telegraf.TrackingAccumulator { +func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { + a.deliverChan = make(chan telegraf.DeliveryInfo, maxTracked) + a.delivered = make([]telegraf.DeliveryInfo, 0, maxTracked) return a } func (a *Accumulator) AddTrackingMetric(m telegraf.Metric) telegraf.TrackingID { - a.AddMetric(m) - return newTrackingID() + dm, id := metric.WithTracking(m, a.onDelivery) + a.AddMetric(dm) + return id } func (a *Accumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID { - for _, m := range group { + db, id := metric.WithGroupTracking(group, a.onDelivery) + for _, m := range db { a.AddMetric(m) } - return newTrackingID() + return id } -func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo { - a.Lock() - if a.delivered == nil { - a.delivered = make(chan telegraf.DeliveryInfo) +func (a *Accumulator) onDelivery(info telegraf.DeliveryInfo) { + select { + case a.deliverChan <- info: + default: + // This is a programming error in the input. More items were sent for + // tracking than space requested. + panic("channel is full") } - a.Unlock() - return a.delivered +} + +func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo { + return a.deliverChan } // AddError appends the given error to Accumulator.Errors. diff --git a/testutil/metric.go b/testutil/metric.go index 123f8f84afc42..45f6ff524d54b 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -372,3 +372,23 @@ func FromTestMetric(met *Metric) telegraf.Metric { m := telegrafMetric.New(met.Measurement, met.Tags, met.Fields, met.Time, met.Type) return m } + +func ToTestMetric(tm telegraf.Metric) *Metric { + tags := make(map[string]string, len(tm.TagList())) + for _, t := range tm.TagList() { + tags[t.Key] = t.Value + } + + fields := make(map[string]interface{}, len(tm.FieldList())) + for _, f := range tm.FieldList() { + fields[f.Key] = f.Value + } + + return &Metric{ + Measurement: tm.Name(), + Fields: fields, + Tags: tags, + Time: tm.Time(), + Type: tm.Type(), + } +}