Skip to content

Commit

Permalink
[receiver/windowseventlog] Propogate errors during parsing (#35461)
Browse files Browse the repository at this point in the history
This PR contains some cleanup following #34720.

This slightly changes some log messages. It also propagates errors
encountered by sending to the next operator (which was done for other
operators in #33847).
  • Loading branch information
djaglowski authored Oct 15, 2024
1 parent 844d663 commit a7b0c3a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/wel-error-cleanup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/windowseventlog

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Errors returned when passing data downstream will now be propagated correctly.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35461]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
58 changes: 28 additions & 30 deletions pkg/stanza/operator/input/windows/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Input struct {
remote RemoteConfig
remoteSessionHandle windows.Handle
startRemoteSession func() error
processEvent func(context.Context, Event)
processEvent func(context.Context, Event) error
}

// newInput creates a new Input operator.
Expand Down Expand Up @@ -219,7 +219,9 @@ func (i *Input) read(ctx context.Context) {
}

for n, event := range events {
i.processEvent(ctx, event)
if err := i.processEvent(ctx, event); err != nil {
i.Logger().Error("process event", zap.Error(err))
}
if len(events) == n+1 {
i.updateBookmarkOffset(ctx, event)
}
Expand All @@ -240,70 +242,66 @@ func (i *Input) getPublisherName(event Event) (name string, excluded bool) {
return providerName, false
}

func (i *Input) renderSimpleAndSend(ctx context.Context, event Event) {
func (i *Input) renderSimpleAndSend(ctx context.Context, event Event) error {
simpleEvent, err := event.RenderSimple(i.buffer)
if err != nil {
i.Logger().Error("Failed to render simple event", zap.Error(err))
return
return fmt.Errorf("render simple event: %w", err)
}
i.sendEvent(ctx, simpleEvent)
return i.sendEvent(ctx, simpleEvent)
}

func (i *Input) renderDeepAndSend(ctx context.Context, event Event, publisher Publisher) {
func (i *Input) renderDeepAndSend(ctx context.Context, event Event, publisher Publisher) error {
deepEvent, err := event.RenderDeep(i.buffer, publisher)
if err == nil {
i.sendEvent(ctx, deepEvent)
return
return i.sendEvent(ctx, deepEvent)
}
i.Logger().Error("Failed to render formatted event", zap.Error(err))
i.renderSimpleAndSend(ctx, event)
return errors.Join(
fmt.Errorf("render deep event: %w", err),
i.renderSimpleAndSend(ctx, event),
)
}

// processEvent will process and send an event retrieved from windows event log.
func (i *Input) processEventWithoutRenderingInfo(ctx context.Context, event Event) {
func (i *Input) processEventWithoutRenderingInfo(ctx context.Context, event Event) error {
if len(i.excludeProviders) == 0 {
i.renderSimpleAndSend(ctx, event)
return
return i.renderSimpleAndSend(ctx, event)
}
if _, exclude := i.getPublisherName(event); exclude {
return
return nil
}
i.renderSimpleAndSend(ctx, event)
return i.renderSimpleAndSend(ctx, event)
}

func (i *Input) processEventWithRenderingInfo(ctx context.Context, event Event) {
func (i *Input) processEventWithRenderingInfo(ctx context.Context, event Event) error {
providerName, exclude := i.getPublisherName(event)
if exclude {
return
return nil
}

publisher, err := i.publisherCache.get(providerName)
if err != nil {
i.Logger().Warn(
"Failed to open event source, respective log entries cannot be formatted",
zap.String("provider", providerName), zap.Error(err))
i.renderSimpleAndSend(ctx, event)
return
return errors.Join(
fmt.Errorf("open event source for provider %q: %w", providerName, err),
i.renderSimpleAndSend(ctx, event),
)
}

if publisher.Valid() {
i.renderDeepAndSend(ctx, event, publisher)
return
return i.renderDeepAndSend(ctx, event, publisher)
}
i.renderSimpleAndSend(ctx, event)
return i.renderSimpleAndSend(ctx, event)
}

// sendEvent will send EventXML as an entry to the operator's output.
func (i *Input) sendEvent(ctx context.Context, eventXML *EventXML) {
func (i *Input) sendEvent(ctx context.Context, eventXML *EventXML) error {
var body any = eventXML.Original
if !i.raw {
body = formattedBody(eventXML)
}

e, err := i.NewEntry(body)
if err != nil {
i.Logger().Error("Failed to create entry", zap.Error(err))
return
return fmt.Errorf("create entry: %w", err)
}

e.Timestamp = parseTimestamp(eventXML.TimeCreated.SystemTime)
Expand All @@ -313,7 +311,7 @@ func (i *Input) sendEvent(ctx context.Context, eventXML *EventXML) {
e.Attributes["server.address"] = i.remote.Server
}

_ = i.Write(ctx, e)
return i.Write(ctx, e)
}

// getBookmarkXML will get the bookmark xml from the offsets database.
Expand Down

0 comments on commit a7b0c3a

Please sign in to comment.