Skip to content

Commit

Permalink
feat(agent): Add option to skip re-running processors after aggregato…
Browse files Browse the repository at this point in the history
…rs (#14882)
  • Loading branch information
powersj authored Feb 23, 2024
1 parent 3929a42 commit f0656a4
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 6 deletions.
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (a *Agent) Run(ctx context.Context) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
aggC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<-
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down Expand Up @@ -1112,7 +1112,7 @@ func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
Expand Down
4 changes: 1 addition & 3 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ func TestCases(t *testing.T) {
// Process expected metrics and compare with resulting metrics
options := []cmp.Option{
testutil.IgnoreTags("host"),
}
if expected[0].Time().IsZero() {
options = append(options, testutil.IgnoreTime())
testutil.IgnoreTime(),
}
testutil.RequireMetricsEqual(t, expected, actual, options...)
})
Expand Down
2 changes: 2 additions & 0 deletions agent/testcases/aggregators-rerun-processors/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
metric value=420
metric value_min=4200,value_max=4200
1 change: 1 addition & 0 deletions agent/testcases/aggregators-rerun-processors/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
metric value=42.0
22 changes: 22 additions & 0 deletions agent/testcases/aggregators-rerun-processors/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Test for not skipping processors after running aggregators
[agent]
omit_hostname = true
skip_processors_after_aggregators = false

[[inputs.file]]
files = ["testcases/aggregators-rerun-processors/input.influx"]
data_format = "influx"

[[processors.starlark]]
source = '''
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''

[[aggregators.minmax]]
period = "1s"
drop_original = false

2 changes: 2 additions & 0 deletions agent/testcases/aggregators-skip-processors/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
metric value=420
metric value_min=420,value_max=420
1 change: 1 addition & 0 deletions agent/testcases/aggregators-skip-processors/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
metric value=42.0
22 changes: 22 additions & 0 deletions agent/testcases/aggregators-skip-processors/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Test for skipping processors after running aggregators
[agent]
omit_hostname = true
skip_processors_after_aggregators = true

[[inputs.file]]
files = ["testcases/aggregators-skip-processors/input.influx"]
data_format = "influx"

[[processors.starlark]]
source = '''
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''

[[aggregators.minmax]]
period = "1s"
drop_original = false

5 changes: 5 additions & 0 deletions cmd/telegraf/agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@
## stateful plugins on termination of Telegraf. If the file exists on start,
## the state in the file will be restored for the plugins.
# statefile = ""

## Flag to skip running processors after aggregators
## By default, processors are run a second time after aggregators. Changing
## this setting to true will skip the second run of processors.
# skip_processors_after_aggregators = false
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ type AgentConfig struct {
// Flag to always keep tags explicitly defined in the global tags section
// and ensure those tags always pass filtering.
AlwaysIncludeGlobalTags bool `toml:"always_include_global_tags"`

// Flag to skip running processors after aggregators
// By default, processors are run a second time after aggregators. Changing
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`
}

// InputNames returns a list of strings of the configured inputs.
Expand Down

0 comments on commit f0656a4

Please sign in to comment.