Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.exec): Restore pre-v1.21 behavior for CSV data_format #12533

Merged
merged 11 commits into from
Jan 26, 2023
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/secretstores"
"github.com/influxdata/telegraf/plugins/serializers"
Expand Down Expand Up @@ -882,6 +883,14 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
}
parser := creator(parentname)

// Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022)
if dataformat == "csv" && parentcategory == "inputs" {
if parentname == "exec" {
csvParser := parser.(*csv.Parser)
csvParser.ResetMode = "always"
}
}

conf := c.buildParser(parentname, table)
if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
Expand Down
158 changes: 158 additions & 0 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -299,3 +305,155 @@ func TestRemoveCarriageReturns(t *testing.T) {
}
}
}

func TestCSVBehavior(t *testing.T) {
// Setup the CSV parser
parser := &csv.Parser{
MetricName: "exec",
HeaderRowCount: 1,
ResetMode: "always",
}
require.NoError(t, parser.Init())

// Setup the plugin
plugin := NewExec()
plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""}
plugin.Log = testutil.Logger{}
plugin.SetParser(parser)
require.NoError(t, plugin.Init())

expected := []telegraf.Metric{
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 1),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 2),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 3),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 4),
),
}

var acc testutil.Accumulator
// Run gather once
require.NoError(t, plugin.Gather(&acc))
// Run gather a second time
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())

// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
}

func TestCases(t *testing.T) {
// Register the plugin
inputs.Add("exec", func() telegraf.Input {
return NewExec()
})

// Setup the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfigData([]byte(`
[[inputs.exec]]
commands = [ "echo \"a,b\n1,2\n3,4\"" ]
data_format = "csv"
csv_header_row_count = 1
`)))
require.Len(t, cfg.Inputs, 1)
plugin := cfg.Inputs[0]
require.NoError(t, plugin.Init())

expected := []telegraf.Metric{
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 1),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 2),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 3),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 4),
),
}

var acc testutil.Accumulator
// Run gather once
require.NoError(t, plugin.Gather(&acc))
// Run gather a second time
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())

// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
}
79 changes: 79 additions & 0 deletions plugins/inputs/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/json"
Expand Down Expand Up @@ -371,3 +373,80 @@ func TestStatefulParsers(t *testing.T) {
})
}
}

func TestCSVBehavior(t *testing.T) {
// Setup the CSV parser creator function
parserFunc := func() (telegraf.Parser, error) {
parser := &csv.Parser{
MetricName: "file",
HeaderRowCount: 1,
}
err := parser.Init()
return parser, err
}

// Setup the plugin
plugin := &File{
Files: []string{filepath.Join("testdata", "csv_behavior_input.csv")},
}
plugin.SetParserFunc(parserFunc)
require.NoError(t, plugin.Init())

expected := []telegraf.Metric{
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 1),
),
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 2),
),
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 3),
),
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 4),
),
}

var acc testutil.Accumulator
// Run gather once
require.NoError(t, plugin.Gather(&acc))
// Run gather a second time
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())

// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
}
3 changes: 3 additions & 0 deletions plugins/inputs/file/testdata/csv_behavior_input.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
a,b
1,2
3,4
Loading