diff --git a/.chloggen/pkg-stanza-error-handling.yaml b/.chloggen/pkg-stanza-error-handling.yaml new file mode 100644 index 000000000000..574a1a34becb --- /dev/null +++ b/.chloggen/pkg-stanza-error-handling.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Honor `send_quiet`/`drop_quiet` setting for errors occur in container operator + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35726] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/operator/helper/parser.go b/pkg/stanza/operator/helper/parser.go index f46c55f970b7..760b249fee78 100644 --- a/pkg/stanza/operator/helper/parser.go +++ b/pkg/stanza/operator/helper/parser.go @@ -125,6 +125,32 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E return p.Write(ctx, entry) } +func (p *ParserOperator) ParseWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) error { + // Short circuit if the "if" condition does not match + skip, err := p.Skip(ctx, entry) + if err != nil { + return p.HandleEntryError(ctx, entry, err) + } + if skip { + return p.Write(ctx, entry) + } + + if err = p.ParseWith(ctx, entry, parse); err != nil { + if p.OnError == DropOnErrorQuiet || p.OnError == SendOnErrorQuiet { + return nil + } + return err + } + if cb != nil { + err = cb(entry) + if err != nil { + return p.HandleEntryError(ctx, entry, err) + } + } + + return nil +} + // ParseWith will process an entry's field with a parser function. func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error { value, ok := entry.Get(p.ParseFrom) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 5c33005435f4..5a39244b0313 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -17,6 +17,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + stanzaerr "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -67,27 +68,19 @@ type Parser struct { // Process will parse an entry of Container logs func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { - var timeLayout string format := p.format if format == "" { format, err = p.detectFormat(entry) if err != nil { - return fmt.Errorf("failed to detect a valid container log format: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "invalid container log format")) } } switch format { case dockerFormat: - err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings) - if err != nil { - return fmt.Errorf("failed to process the docker log: %w", err) - } - timeLayout = goTimeLayout - err = parseTime(entry, timeLayout) - if err != nil { - return fmt.Errorf("failed to parse time: %w", err) - } + return p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.cbGoTimeLayout) + case containerdFormat, crioFormat: p.criConsumerStartOnce.Do(func() { err = p.criLogEmitter.Start(nil) @@ -104,48 +97,28 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { p.asyncConsumerStarted = true }) - // Short circuit if the "if" condition does not match - skip, err := p.Skip(ctx, entry) - if err != nil { - return p.HandleEntryError(ctx, entry, err) - } - if skip { - return p.Write(ctx, entry) - } - if format == containerdFormat { // parse the message - err = p.ParserOperator.ParseWith(ctx, entry, p.parseContainerd) + err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseContainerd, p.cbGoTimeLayout) if err != nil { - return fmt.Errorf("failed to parse containerd log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "containerd log")) } - timeLayout = goTimeLayout + } else { // parse the message - err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO) + err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseCRIO, p.cbCRITimeLayout) if err != nil { - return fmt.Errorf("failed to parse crio log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse crio log")) } - timeLayout = crioTimeLayout - } - - err = parseTime(entry, timeLayout) - if err != nil { - return fmt.Errorf("failed to parse time: %w", err) - } - - err = p.handleAttributeMappings(entry) - if err != nil { - return fmt.Errorf("failed to handle attribute mappings: %w", err) } // send it to the recombine operator err = p.recombineParser.Process(ctx, entry) if err != nil { - return fmt.Errorf("failed to recombine the crio log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "recombine the cri log")) } default: - return fmt.Errorf("failed to detect a valid container log format") + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "invalid container log format")) } return nil @@ -265,6 +238,28 @@ func (p *Parser) handleAttributeMappings(e *entry.Entry) error { return nil } +func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { + return p.handleTimeAndAttributes(e, goTimeLayout) + +} + +func (p *Parser) cbCRITimeLayout(e *entry.Entry) error { + return p.handleTimeAndAttributes(e, crioTimeLayout) +} + +func (p *Parser) handleTimeAndAttributes(e *entry.Entry, layout string) error { + err := parseTime(e, layout) + if err != nil { + return fmt.Errorf("failed to parse time: %w", err) + } + + err = p.handleAttributeMappings(e) + if err != nil { + return fmt.Errorf("failed to handle attribute mappings: %w", err) + } + return nil +} + // handleMoveAttributes moves fields to final attributes func (p *Parser) handleMoveAttributes(e *entry.Entry) error { // move `log` to `body` explicitly first to avoid diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index 9c684e74d31a..75a01e0a0f1c 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -88,6 +88,34 @@ func TestInternalRecombineCfg(t *testing.T) { require.Equal(t, expected, cfg) } +func TestProcessOnErrorSendQuiet(t *testing.T) { + // contains incorrect time format + faultyEntry := &entry.Entry{ + Body: `{"log":"INFO: log line here","stream":"stdout","time":"2023033"}`, + } + expectedError := "parsing time \"2023033\" as \"2006-01-02T15:04:05.999Z\": cannot parse \"033\" as \"-\"" + + cfg := NewConfigWithID("test_id") + cfg.AddMetadataFromFilePath = false + set := componenttest.NewNopTelemetrySettings() + + t.Run("without send_quiet", func(t *testing.T) { + op, err := cfg.Build(set) + require.NoError(t, err, "incorrect build") + err = op.Process(context.Background(), faultyEntry) + require.ErrorContains(t, err, expectedError, "on_error is not working correctly") + }) + + t.Run("with send_quiet", func(t *testing.T) { + cfg.OnError = "send_quiet" + op, err := cfg.Build(set) + require.NoError(t, err, "incorrect build") + err = op.Process(context.Background(), faultyEntry) + require.NoError(t, err, "send_quiet is not working correctly") + }) + +} + func TestProcess(t *testing.T) { cases := []struct { name string