From a7dbe32862a5c7f31dbcbca35ac3c2dc6c424286 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 12:28:09 +0100 Subject: [PATCH 01/11] Add test-case for tail's CSV behavior. --- plugins/inputs/tail/tail_test.go | 91 ++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index e9e2bcdbfe1b3..008323ef3e908 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -9,10 +9,12 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/grok" @@ -688,6 +690,95 @@ func TestTailEOF(t *testing.T) { require.NoError(t, err) } +func TestCSVBehavior(t *testing.T) { + // Prepare the input file + input, err := os.CreateTemp("", "") + require.NoError(t, err) + defer os.Remove(input.Name()) + // Write header + _, err = input.WriteString("a,b\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + + // Setup the CSV parser creator function + parserFunc := func() (parsers.Parser, error) { + parser := &csv.Parser{ + MetricName: "tail", + HeaderRowCount: 1, + } + err := parser.Init() + return parser, err + } + + // Setup the plugin + plugin := &Tail{ + Files: []string{input.Name()}, + FromBeginning: true, + MaxUndeliveredLines: 1000, + offsets: make(map[string]int64, 0), + PathTag: "path", + Log: testutil.Logger{}, + } + plugin.SetParserFunc(parserFunc) + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Write the first line of data + _, err = input.WriteString("1,2\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + require.NoError(t, plugin.Gather(&acc)) + + // Write another line of data + _, err = input.WriteString("3,4\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= 2 + }, time.Second, 100*time.Millisecond, "Expected 2 metrics found %d", acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + expected := []telegraf.Metric{ + metric.New( + "tail", + map[string]string{ + "path": input.Name(), + }, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 0), + ), + metric.New( + "tail", + map[string]string{ + "path": input.Name(), + }, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 0), + ), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) + + // Close the input file + require.NoError(t, input.Close()) +} + func getTestdataDir() string { dir, err := os.Getwd() if err != nil { From 5e6863ffb52a2115f5a15abd2cd5ebb26ba94025 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 12:37:18 +0100 Subject: [PATCH 02/11] Add test-case for file's CSV behavior. --- plugins/inputs/file/file_test.go | 79 ++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/plugins/inputs/file/file_test.go b/plugins/inputs/file/file_test.go index ffcc02dd1aa77..1dac3ea44f522 100644 --- a/plugins/inputs/file/file_test.go +++ b/plugins/inputs/file/file_test.go @@ -11,9 +11,11 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/json" @@ -371,3 +373,80 @@ func TestStatefulParsers(t *testing.T) { }) } } + +func TestCSVBehavior(t *testing.T) { + // Setup the CSV parser creator function + parserFunc := func() (telegraf.Parser, error) { + parser := &csv.Parser{ + MetricName: "file", + HeaderRowCount: 1, + } + err := parser.Init() + return parser, err + } + + // Setup the plugin + plugin := &File{ + Files: []string{filepath.Join("testdata", "csv_behavior_input.csv")}, + } + plugin.SetParserFunc(parserFunc) + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + } + + var acc testutil.Accumulator + // Run gather once + require.NoError(t, plugin.Gather(&acc)) + // Run gather a second time + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} From 9c37487196f1a6e525b13329f31ca75ad8856db3 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 12:46:54 +0100 Subject: [PATCH 03/11] Add test-case for exec's CSV behavior. --- plugins/inputs/exec/exec_test.go | 78 ++++++++++++++++++++++++++++++++ plugins/inputs/tail/tail_test.go | 53 +++++++++++----------- 2 files changed, 105 insertions(+), 26 deletions(-) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 958f9dcac2f29..28e9171a1bde9 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -12,8 +12,12 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" @@ -299,3 +303,77 @@ func TestRemoveCarriageReturns(t *testing.T) { } } } + +func TestCSVBehavior(t *testing.T) { + // Setup the CSV parser + parser := &csv.Parser{ + MetricName: "exec", + HeaderRowCount: 1, + } + require.NoError(t, parser.Init()) + + // Setup the plugin + plugin := NewExec() + plugin.Commands = []string{"echo -e 'a,b\n1,2\n3,4'"} + plugin.Log = testutil.Logger{} + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + } + + var acc testutil.Accumulator + // Run gather once + require.NoError(t, plugin.Gather(&acc)) + // Run gather a second time + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 008323ef3e908..303a069f3ec2b 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -722,32 +722,6 @@ func TestCSVBehavior(t *testing.T) { plugin.SetParserFunc(parserFunc) require.NoError(t, plugin.Init()) - var acc testutil.Accumulator - require.NoError(t, plugin.Start(&acc)) - defer plugin.Stop() - - // Write the first line of data - _, err = input.WriteString("1,2\n") - require.NoError(t, err) - require.NoError(t, input.Sync()) - require.NoError(t, plugin.Gather(&acc)) - - // Write another line of data - _, err = input.WriteString("3,4\n") - require.NoError(t, err) - require.NoError(t, input.Sync()) - require.NoError(t, plugin.Gather(&acc)) - require.Eventuallyf(t, func() bool { - acc.Lock() - defer acc.Unlock() - return acc.NMetrics() >= 2 - }, time.Second, 100*time.Millisecond, "Expected 2 metrics found %d", acc.NMetrics()) - - // Check the result - options := []cmp.Option{ - testutil.SortMetrics(), - testutil.IgnoreTime(), - } expected := []telegraf.Metric{ metric.New( "tail", @@ -772,6 +746,33 @@ func TestCSVBehavior(t *testing.T) { time.Unix(0, 0), ), } + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Write the first line of data + _, err = input.WriteString("1,2\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + require.NoError(t, plugin.Gather(&acc)) + + // Write another line of data + _, err = input.WriteString("3,4\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, options...) From 5811be61df65cac7a0c0a7ae54105c7e5d698d5e Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 13:01:13 +0100 Subject: [PATCH 04/11] Add another test case to create the plugin from config to test the later fix. --- plugins/inputs/exec/exec_test.go | 80 ++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 28e9171a1bde9..482b6040c6a17 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -16,7 +16,9 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/value" @@ -309,6 +311,7 @@ func TestCSVBehavior(t *testing.T) { parser := &csv.Parser{ MetricName: "exec", HeaderRowCount: 1, + ResetMode: "always", } require.NoError(t, parser.Init()) @@ -377,3 +380,80 @@ func TestCSVBehavior(t *testing.T) { actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, options...) } + +func TestCases(t *testing.T) { + // Register the plugin + inputs.Add("exec", func() telegraf.Input { + return NewExec() + }) + + // Setup the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfigData([]byte(` + [[inputs.exec]] + commands = [ "echo -e 'a,b\n1,2\n3,4'" ] + data_format = "csv" + csv_header_row_count = 1 +`))) + require.Len(t, cfg.Inputs, 1) + plugin := cfg.Inputs[0] + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + } + + var acc testutil.Accumulator + // Run gather once + require.NoError(t, plugin.Gather(&acc)) + // Run gather a second time + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} From 92f833d60235585c3a4370a01a7db5d7f062e848 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 13:01:42 +0100 Subject: [PATCH 05/11] Fix exec behavior to always reset the csv parser. --- config/config.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/config/config.go b/config/config.go index 20c817fa40bec..493b1788790c2 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/secretstores" "github.com/influxdata/telegraf/plugins/serializers" @@ -882,6 +883,14 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) } parser := creator(parentname) + // Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022) + if dataformat == "csv" && parentcategory == "inputs" { + if parentname == "exec" { + csv := parser.(*csv.Parser) + csv.ResetMode = "always" + } + } + conf := c.buildParser(parentname, table) if err := c.toml.UnmarshalTable(table, parser); err != nil { return nil, err From bdc94ad687cce54f62c2b8a847f8e745a3fbf6a6 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 14:27:45 +0100 Subject: [PATCH 06/11] Fix linter issue. --- config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 493b1788790c2..b4ccf9fe0e055 100644 --- a/config/config.go +++ b/config/config.go @@ -886,8 +886,8 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) // Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022) if dataformat == "csv" && parentcategory == "inputs" { if parentname == "exec" { - csv := parser.(*csv.Parser) - csv.ResetMode = "always" + csvParser := parser.(*csv.Parser) + csvParser.ResetMode = "always" } } From 31a8f3284553e23c5e16aff20a825ae9698f038b Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 14:34:35 +0100 Subject: [PATCH 07/11] Add missing test file. --- plugins/inputs/file/testdata/csv_behavior_input.csv | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 plugins/inputs/file/testdata/csv_behavior_input.csv diff --git a/plugins/inputs/file/testdata/csv_behavior_input.csv b/plugins/inputs/file/testdata/csv_behavior_input.csv new file mode 100644 index 0000000000000..0099ae9378f99 --- /dev/null +++ b/plugins/inputs/file/testdata/csv_behavior_input.csv @@ -0,0 +1,3 @@ +a,b +1,2 +3,4 From 6d9f40d131c2d16fbcfdab320d09316917c40cf7 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 15:44:21 +0100 Subject: [PATCH 08/11] Try double-quotes to fix echo on MacOS. --- plugins/inputs/exec/exec_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 482b6040c6a17..76b9853415936 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -391,7 +391,7 @@ func TestCases(t *testing.T) { cfg := config.NewConfig() require.NoError(t, cfg.LoadConfigData([]byte(` [[inputs.exec]] - commands = [ "echo -e 'a,b\n1,2\n3,4'" ] + commands = [ 'echo -e "a,b\n1,2\n3,4"' ] data_format = "csv" csv_header_row_count = 1 `))) From f50b50043c3719bee20bf34c9c97496b0838b952 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Jan 2023 15:45:17 +0100 Subject: [PATCH 09/11] Try double-quotes to fix echo on MacOS another instance. --- plugins/inputs/exec/exec_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 76b9853415936..7ecb3ed54fa66 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -317,7 +317,7 @@ func TestCSVBehavior(t *testing.T) { // Setup the plugin plugin := NewExec() - plugin.Commands = []string{"echo -e 'a,b\n1,2\n3,4'"} + plugin.Commands = []string{`echo -e "a,b\n1,2\n3,4"`} plugin.Log = testutil.Logger{} plugin.SetParser(parser) require.NoError(t, plugin.Init()) From 812dca987a553b2b4d52ff197497ad25e868583b Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Thu, 26 Jan 2023 13:16:05 -0700 Subject: [PATCH 10/11] tests: fix quoting to make MacOS happy --- plugins/inputs/exec/exec_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 7ecb3ed54fa66..5ac175ca70abc 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -317,7 +317,7 @@ func TestCSVBehavior(t *testing.T) { // Setup the plugin plugin := NewExec() - plugin.Commands = []string{`echo -e "a,b\n1,2\n3,4"`} + plugin.Commands = []string{"echo -e \"a,b\n1,2\n3,4\""} plugin.Log = testutil.Logger{} plugin.SetParser(parser) require.NoError(t, plugin.Init()) @@ -391,7 +391,7 @@ func TestCases(t *testing.T) { cfg := config.NewConfig() require.NoError(t, cfg.LoadConfigData([]byte(` [[inputs.exec]] - commands = [ 'echo -e "a,b\n1,2\n3,4"' ] + commands = [ "echo \"a,b\n1,2\n3,4\"" ] data_format = "csv" csv_header_row_count = 1 `))) From 09e85c8157dc0f98b9960a093247ba7a69584fe3 Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Thu, 26 Jan 2023 13:29:03 -0700 Subject: [PATCH 11/11] test: remove the -e --- plugins/inputs/exec/exec_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 5ac175ca70abc..3ac20f0feac05 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -317,7 +317,7 @@ func TestCSVBehavior(t *testing.T) { // Setup the plugin plugin := NewExec() - plugin.Commands = []string{"echo -e \"a,b\n1,2\n3,4\""} + plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""} plugin.Log = testutil.Logger{} plugin.SetParser(parser) require.NoError(t, plugin.Init())