Skip to content

Commit

Permalink
[pkg/stanza] Ensure time parsing happens before entry is sent downwar…
Browse files Browse the repository at this point in the history
…ds (open-telemetry#36213)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This issue was caught at
open-telemetry#35758.
This PR ensures that time parsing happens before the entry is sent to
the next operator in the pipeline.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes ~

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added

<!--Describe the documentation added.-->
#### Documentation
~

<!--Please delete paragraphs that you did not use before submitting.-->

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark authored and sbylica-splunk committed Dec 17, 2024
1 parent 6efcc70 commit 8703946
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 19 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix_container_time_parsing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure that time parsing happens before entry is sent to downstream operators

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36213]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
33 changes: 14 additions & 19 deletions pkg/stanza/operator/parser/container/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ type Parser struct {
asyncConsumerStarted bool
criConsumerStartOnce sync.Once
criConsumers *sync.WaitGroup
timeLayout string
}

// Process will parse an entry of Container logs
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
var timeLayout string

format := p.format
if format == "" {
format, err = p.detectFormat(entry)
Expand All @@ -79,15 +78,11 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {

switch format {
case dockerFormat:
err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings)
p.timeLayout = goTimeLayout
err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleTimeAndAttributeMappings)
if err != nil {
return fmt.Errorf("failed to process the docker log: %w", err)
}
timeLayout = goTimeLayout
err = parseTime(entry, timeLayout)
if err != nil {
return fmt.Errorf("failed to parse time: %w", err)
}
case containerdFormat, crioFormat:
p.criConsumerStartOnce.Do(func() {
err = p.criLogEmitter.Start(nil)
Expand Down Expand Up @@ -119,22 +114,17 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
if err != nil {
return fmt.Errorf("failed to parse containerd log: %w", err)
}
timeLayout = goTimeLayout
p.timeLayout = goTimeLayout
} else {
// parse the message
err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO)
if err != nil {
return fmt.Errorf("failed to parse crio log: %w", err)
}
timeLayout = crioTimeLayout
p.timeLayout = crioTimeLayout
}

err = parseTime(entry, timeLayout)
if err != nil {
return fmt.Errorf("failed to parse time: %w", err)
}

err = p.handleAttributeMappings(entry)
err = p.handleTimeAndAttributeMappings(entry)
if err != nil {
return fmt.Errorf("failed to handle attribute mappings: %w", err)
}
Expand Down Expand Up @@ -251,9 +241,14 @@ func (p *Parser) parseDocker(value any) (any, error) {
return parsedValue, nil
}

// handleAttributeMappings handles fields' mappings and k8s meta extraction
func (p *Parser) handleAttributeMappings(e *entry.Entry) error {
err := p.handleMoveAttributes(e)
// handleTimeAndAttributeMappings handles fields' mappings and k8s meta extraction
func (p *Parser) handleTimeAndAttributeMappings(e *entry.Entry) error {
err := parseTime(e, p.timeLayout)
if err != nil {
return fmt.Errorf("failed to parse time: %w", err)
}

err = p.handleMoveAttributes(e)
if err != nil {
return err
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/stanza/operator/parser/container/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,69 @@ func TestRecombineProcess(t *testing.T) {
}
}

func TestProcessWithDockerTime(t *testing.T) {
cases := []struct {
name string
op func() (operator.Operator, error)
input *entry.Entry
expectedOutput *entry.Entry
}{
{
"docker",
func() (operator.Operator, error) {
cfg := NewConfigWithID("test_id")
cfg.AddMetadataFromFilePath = true
set := componenttest.NewNopTelemetrySettings()
return cfg.Build(set)
},
&entry.Entry{
Body: `{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}`,
Attributes: map[string]any{
"log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
},
},
&entry.Entry{
Attributes: map[string]any{
"log.iostream": "stdout",
"log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
},
Body: "INFO: log line here",
Resource: map[string]any{
"k8s.pod.name": "kube-scheduler-kind-control-plane",
"k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
"k8s.container.name": "kube-scheduler44",
"k8s.container.restart_count": "1",
"k8s.namespace.name": "some",
},
Timestamp: time.Date(2029, time.March, 30, 8, 31, 20, 545192187, time.UTC),
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
op, err := tc.op()
require.NoError(t, err)
defer func() { require.NoError(t, op.Stop()) }()
r := op.(*Parser)

fake := testutil.NewFakeOutput(t)
r.OutputOperators = ([]operator.Operator{fake})

require.NoError(t, r.Process(ctx, tc.input))

fake.ExpectEntry(t, tc.expectedOutput)

select {
case e := <-fake.Received:
require.FailNow(t, "Received unexpected entry: ", e)
default:
}
})
}
}

func TestCRIRecombineProcessWithFailedDownstreamOperator(t *testing.T) {
cases := []struct {
name string
Expand Down

0 comments on commit 8703946

Please sign in to comment.