Skip to content

Commit

Permalink
refactor & simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
colelaven committed Oct 4, 2024
1 parent 444dea3 commit 6d0ba56
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
10 changes: 7 additions & 3 deletions processor/logdedupprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ func createLogsProcessor(_ context.Context, settings processor.Settings, cfg com
}

if processorCfg.Condition == defaultCondition {
processor.matchFunc = processor.dedupAll
processor.condition = nil
} else {
condition, err := filterottl.NewBoolExprForLog([]string{processorCfg.Condition}, filterottl.StandardLogFuncs(), ottl.PropagateError, settings.TelemetrySettings)
condition, err := filterottl.NewBoolExprForLog(
[]string{processorCfg.Condition},
filterottl.StandardLogFuncs(),
ottl.PropagateError,
settings.TelemetrySettings,
)
if err != nil {
return nil, fmt.Errorf("invalid condition: %w", err)
}
processor.condition = condition
processor.matchFunc = processor.dedupMatches
}

return processor, nil
Expand Down
34 changes: 16 additions & 18 deletions processor/logdedupprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
type logDedupProcessor struct {
emitInterval time.Duration
condition expr.BoolExpr[ottllog.TransformContext]
matchFunc func(context.Context, plog.LogRecord, pcommon.InstrumentationScope, pcommon.Resource, plog.ScopeLogs, plog.ResourceLogs) bool
aggregator *logAggregator
remover *fieldRemover
nextConsumer consumer.Logs
Expand Down Expand Up @@ -97,7 +96,21 @@ func (p *logDedupProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error
logs := sl.LogRecords()

logs.RemoveIf(func(logRecord plog.LogRecord) bool {
return p.matchFunc(ctx, logRecord, scope, resource, sl, rl)
if p.condition == nil {
p.aggregateLog(logRecord, scope, resource)
return true
}

logCtx := ottllog.NewTransformContext(logRecord, scope, resource, sl, rl)
logMatch, err := p.condition.Eval(ctx, logCtx)
if err != nil {
p.logger.Error("error matching condition", zap.Error(err))
return false
}
if logMatch {
p.aggregateLog(logRecord, scope, resource)
}
return logMatch
})
}
}
Expand All @@ -113,24 +126,9 @@ func (p *logDedupProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error
return nil
}

func (p *logDedupProcessor) dedupMatches(ctx context.Context, logRecord plog.LogRecord, scope pcommon.InstrumentationScope, resource pcommon.Resource, sl plog.ScopeLogs, rl plog.ResourceLogs) bool {
logCtx := ottllog.NewTransformContext(logRecord, scope, resource, sl, rl)
logMatch, err := p.condition.Eval(ctx, logCtx)
if err != nil {
p.logger.Error("error matching condition", zap.Error(err))
return false
}
if logMatch {
p.remover.RemoveFields(logRecord)
p.aggregator.Add(resource, scope, logRecord)
}
return logMatch
}

func (p *logDedupProcessor) dedupAll(_ context.Context, logRecord plog.LogRecord, scope pcommon.InstrumentationScope, resource pcommon.Resource, _ plog.ScopeLogs, _ plog.ResourceLogs) bool {
func (p *logDedupProcessor) aggregateLog(logRecord plog.LogRecord, scope pcommon.InstrumentationScope, resource pcommon.Resource) {
p.remover.RemoveFields(logRecord)
p.aggregator.Add(resource, scope, logRecord)
return true
}

// handleExportInterval sends metrics at the configured interval.
Expand Down
9 changes: 4 additions & 5 deletions processor/logdedupprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestProcessorShutdownCtxError(t *testing.T) {

// Create a processor
p, err := newProcessor(cfg, logsSink, settings)
p.matchFunc = p.dedupAll
p.condition = nil
require.NoError(t, err)

// Start then stop the processor checking for errors
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestShutdownBeforeStart(t *testing.T) {

// Create a processor
p, err := newProcessor(cfg, logsSink, settings)
p.matchFunc = p.dedupAll
p.condition = nil
require.NoError(t, err)
require.NotPanics(t, func() {
err := p.Shutdown(context.Background())
Expand All @@ -149,7 +149,7 @@ func TestProcessorConsume(t *testing.T) {

// Create a processor
p, err := newProcessor(cfg, logsSink, settings)
p.matchFunc = p.dedupAll
p.condition = nil
require.NoError(t, err)

err = p.Start(context.Background(), componenttest.NewNopHost())
Expand Down Expand Up @@ -213,7 +213,7 @@ func Test_unsetLogsAreExportedOnShutdown(t *testing.T) {

// Create & start a processor
p, err := newProcessor(cfg, logsSink, processortest.NewNopSettings())
p.matchFunc = p.dedupAll
p.condition = nil
require.NoError(t, err)
err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
Expand Down Expand Up @@ -252,7 +252,6 @@ func TestProcessorConsumeCondition(t *testing.T) {
// Create a processor
p, err := newProcessor(cfg, logsSink, processortest.NewNopSettings())
p.condition = getCondition(t, cfg.Condition)
p.matchFunc = p.dedupMatches
require.NoError(t, err)

err = p.Start(context.Background(), componenttest.NewNopHost())
Expand Down

0 comments on commit 6d0ba56

Please sign in to comment.