Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processors.starlark): Avoid negative refcounts for tracking metrics #14395

Merged
merged 11 commits into from
Dec 7, 2023
6 changes: 6 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,9 @@ type UnwrappableMetric interface {
// wraps it in the first place.
Unwrap() Metric
}

type TrackingMetric interface {
// TrackingID returns the ID used for tracking the metric
TrackingID() TrackingID
UnwrappableMetric
}
5 changes: 5 additions & 0 deletions metric/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (m *trackingMetric) decr() {
}
}

// Unwrap allows to access the underlying metric directly e.g. for go-templates
func (m *trackingMetric) TrackingID() telegraf.TrackingID {
return m.d.id
}

// Unwrap allows to access the underlying metric directly e.g. for go-templates
func (m *trackingMetric) Unwrap() telegraf.Metric {
return m.Metric
Expand Down
16 changes: 12 additions & 4 deletions plugins/common/starlark/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"go.starlark.net/starlark"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

Expand Down Expand Up @@ -49,13 +50,20 @@ func items(value starlark.Value, errorMsg string) ([]starlark.Tuple, error) {

func deepcopy(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var sm *Metric
if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil {
var track bool
if err := starlark.UnpackArgs("deepcopy", args, kwargs, "source", &sm, "track?", &track); err != nil {
return nil, err
}

dup := sm.metric.Copy()
dup.Drop()
return &Metric{metric: dup}, nil
// In case we copy a tracking metric but do not want to track the result,
// we have to strip the tracking information. This can be done by unwrapping
// the metric.
if tm, ok := sm.metric.(telegraf.TrackingMetric); ok && !track {
return &Metric{metric: tm.Unwrap().Copy()}, nil
}

// Copy the whole metric including potential tracking information
return &Metric{metric: sm.metric.Copy()}, nil
}

// catch(f) evaluates f() and returns its evaluation error message
Expand Down
52 changes: 24 additions & 28 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,38 +2024,34 @@ func TestParse_DeltaCounter(t *testing.T) {

require.Eventuallyf(t, func() bool {
require.NoError(t, statsd.Gather(acc))
acc.Lock()
defer acc.Unlock()

fmt.Println(acc.NMetrics())
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
got := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))
return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())

startTime, ok := got[0].GetField("start_time")
require.True(t, ok, "expected start_time field")
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
got := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))

startTimeStr, ok := startTime.(string)
require.True(t, ok, "expected start_time field to be a string")
startTime, ok := got[0].GetField("start_time")
require.True(t, ok, "expected start_time field")

_, err = time.Parse(time.RFC3339, startTimeStr)
require.NoError(t, err, "execpted start_time field to be in RFC3339 format")
startTimeStr, ok := startTime.(string)
require.True(t, ok, "expected start_time field to be a string")

return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())
_, err = time.Parse(time.RFC3339, startTimeStr)
require.NoError(t, err, "execpted start_time field to be in RFC3339 format")

require.NoError(t, conn.Close())
}
21 changes: 10 additions & 11 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,7 @@ cpu,42
return &parser, err
})

err = plugin.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
plugin.Stop()
require.NoError(t, plugin.Init())

expected := []telegraf.Metric{
testutil.MustMetric("cpu",
Expand All @@ -386,6 +376,15 @@ cpu,42
},
time.Unix(0, 0)),
}

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

require.NoError(t, plugin.Gather(&acc))
require.Eventually(t, func() bool {
return acc.NFields() >= len(expected)
}, 3*time.Second, 100*time.Millisecond)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

Expand Down
7 changes: 6 additions & 1 deletion plugins/processors/starlark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ of type int, float, string, or bool.
The timestamp of the metric as an integer in nanoseconds since the Unix
epoch.

- **deepcopy(*metric*)**: Make a copy of an existing metric.
- **deepcopy(*metric*, *track=false*)**:
Copy an existing metric with or without tracking information. If `track` is set
to `true`, the tracking information is copied.
**Caution:** Make sure to always return *all* metrics with tracking information!
Otherwise, the corresponding inputs will never receive the delivery information
and potentially overrun!

### Python Differences

Expand Down
2 changes: 0 additions & 2 deletions plugins/processors/starlark/starlark.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
// the original metric to the accumulator
if v.ID == origMetric.HashID() {
origFound = true
m.Accept()
s.results = append(s.results, origMetric)
acc.AddMetric(origMetric)
continue
Expand Down Expand Up @@ -108,7 +107,6 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
// If we got the original metric back, use that and drop the new one.
// Otherwise mark the original as accepted and use the new metric.
if origMetric.HashID() == rv.ID {
m.Accept()
acc.AddMetric(origMetric)
} else {
origMetric.Accept()
Expand Down
31 changes: 29 additions & 2 deletions plugins/processors/starlark/starlark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3400,6 +3400,32 @@ def apply(metric):
newmetric = Metric("new_metric")
newmetric.fields["vaue"] = 42
return [metric, newmetric]
`,
},
{
name: "return original and deep-copy",
numMetrics: 2,
source: `
def apply(metric):
return [metric, deepcopy(metric, track=True)]
`,
},
{
name: "deep-copy but do not return",
numMetrics: 1,
source: `
def apply(metric):
x = deepcopy(metric)
return [metric]
`,
},
{
name: "deep-copy but do not return original metric",
numMetrics: 1,
source: `
def apply(metric):
x = deepcopy(metric, track=True)
return [x]
`,
},
}
Expand All @@ -3426,8 +3452,9 @@ def apply(metric):
plugin.Stop()

// Ensure we get back the correct number of metrics
require.Len(t, acc.GetTelegrafMetrics(), tt.numMetrics)
for _, m := range acc.GetTelegrafMetrics() {
actual := acc.GetTelegrafMetrics()
require.Lenf(t, actual, tt.numMetrics, "expected %d metrics but got %d", tt.numMetrics, len(actual))
for _, m := range actual {
m.Accept()
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/processors/starlark/testdata/value_filter.star
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Filter metrics by value
'''
In this example we look at the `value` field of the metric.
If the value is zeor, we delete all the fields, effectively dropping the metric.
If the value is zero, we delete all the fields, effectively dropping the metric.

Example Input:
temperature sensor="001A0",value=111.48 1618488000000000999
Expand Down
Loading