From 87039464470e5ed02c7ff6c0706317d23c25a25a Mon Sep 17 00:00:00 2001 From: Christos Markou Date: Thu, 7 Nov 2024 16:00:08 +0200 Subject: [PATCH] [pkg/stanza] Ensure time parsing happens before entry is sent downwards (#36213) #### Description This issue was caught at https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35758. This PR ensures that time parsing happens before the entry is sent to the next operator in the pipeline. #### Link to tracking issue Fixes ~ #### Testing Added #### Documentation ~ Signed-off-by: ChrsMark --- .chloggen/fix_container_time_parsing.yaml | 27 ++++++++ .../operator/parser/container/parser.go | 33 +++++----- .../operator/parser/container/parser_test.go | 63 +++++++++++++++++++ 3 files changed, 104 insertions(+), 19 deletions(-) create mode 100644 .chloggen/fix_container_time_parsing.yaml diff --git a/.chloggen/fix_container_time_parsing.yaml b/.chloggen/fix_container_time_parsing.yaml new file mode 100644 index 000000000000..6c108aed5561 --- /dev/null +++ b/.chloggen/fix_container_time_parsing.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Ensure that time parsing happens before entry is sent to downstream operators + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36213] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 5c33005435f4..b53c025869df 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -63,12 +63,11 @@ type Parser struct { asyncConsumerStarted bool criConsumerStartOnce sync.Once criConsumers *sync.WaitGroup + timeLayout string } // Process will parse an entry of Container logs func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { - var timeLayout string - format := p.format if format == "" { format, err = p.detectFormat(entry) @@ -79,15 +78,11 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { switch format { case dockerFormat: - err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings) + p.timeLayout = goTimeLayout + err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleTimeAndAttributeMappings) if err != nil { return fmt.Errorf("failed to process the docker log: %w", err) } - timeLayout = goTimeLayout - err = parseTime(entry, timeLayout) - if err != nil { - return fmt.Errorf("failed to parse time: %w", err) - } case containerdFormat, crioFormat: p.criConsumerStartOnce.Do(func() { err = p.criLogEmitter.Start(nil) @@ -119,22 +114,17 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { if err != nil { return fmt.Errorf("failed to parse containerd log: %w", err) } - timeLayout = goTimeLayout + p.timeLayout = goTimeLayout } else { // parse the message err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO) if err != nil { return fmt.Errorf("failed to parse crio log: %w", err) } - timeLayout = crioTimeLayout + p.timeLayout = crioTimeLayout } - err = parseTime(entry, timeLayout) - if err != nil { - return fmt.Errorf("failed to parse time: %w", err) - } - - err = p.handleAttributeMappings(entry) + err = p.handleTimeAndAttributeMappings(entry) if err != nil { return fmt.Errorf("failed to handle attribute mappings: %w", err) } @@ -251,9 +241,14 @@ func (p *Parser) parseDocker(value any) (any, error) { return parsedValue, nil } -// handleAttributeMappings handles fields' mappings and k8s meta extraction -func (p *Parser) handleAttributeMappings(e *entry.Entry) error { - err := p.handleMoveAttributes(e) +// handleTimeAndAttributeMappings handles fields' mappings and k8s meta extraction +func (p *Parser) handleTimeAndAttributeMappings(e *entry.Entry) error { + err := parseTime(e, p.timeLayout) + if err != nil { + return fmt.Errorf("failed to parse time: %w", err) + } + + err = p.handleMoveAttributes(e) if err != nil { return err } diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index 9c684e74d31a..93e769c23f4b 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -408,6 +408,69 @@ func TestRecombineProcess(t *testing.T) { } } +func TestProcessWithDockerTime(t *testing.T) { + cases := []struct { + name string + op func() (operator.Operator, error) + input *entry.Entry + expectedOutput *entry.Entry + }{ + { + "docker", + func() (operator.Operator, error) { + cfg := NewConfigWithID("test_id") + cfg.AddMetadataFromFilePath = true + set := componenttest.NewNopTelemetrySettings() + return cfg.Build(set) + }, + &entry.Entry{ + Body: `{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}`, + Attributes: map[string]any{ + "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log", + }, + }, + &entry.Entry{ + Attributes: map[string]any{ + "log.iostream": "stdout", + "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log", + }, + Body: "INFO: log line here", + Resource: map[string]any{ + "k8s.pod.name": "kube-scheduler-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3", + "k8s.container.name": "kube-scheduler44", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + }, + Timestamp: time.Date(2029, time.March, 30, 8, 31, 20, 545192187, time.UTC), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + op, err := tc.op() + require.NoError(t, err) + defer func() { require.NoError(t, op.Stop()) }() + r := op.(*Parser) + + fake := testutil.NewFakeOutput(t) + r.OutputOperators = ([]operator.Operator{fake}) + + require.NoError(t, r.Process(ctx, tc.input)) + + fake.ExpectEntry(t, tc.expectedOutput) + + select { + case e := <-fake.Received: + require.FailNow(t, "Received unexpected entry: ", e) + default: + } + }) + } +} + func TestCRIRecombineProcessWithFailedDownstreamOperator(t *testing.T) { cases := []struct { name string