Skip to content

Commit

Permalink
[pkg/stanza] Fix recombine issues
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Feb 1, 2024
1 parent e45a5c2 commit 7ae9094
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 22 deletions.
17 changes: 3 additions & 14 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (r *Transformer) Stop() error {
r.flushAllSources(ctx)

close(r.chClose)

return nil
}

Expand Down Expand Up @@ -239,18 +238,17 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {

switch {
// This is the first entry in the next batch
case matches && r.matchIndicatesFirst():
case matches && r.matchFirstLine:
// Flush the existing batch
err := r.flushSource(ctx, s)
if err != nil {
if err := r.flushSource(ctx, s); err != nil {
return err
}

// Add the current log to the new batch
r.addToBatch(ctx, e, s)
return nil
// This is the last entry in a complete batch
case matches && r.matchIndicatesLast():
case matches && !r.matchFirstLine:
r.addToBatch(ctx, e, s)
return r.flushSource(ctx, s)
}
Expand All @@ -261,14 +259,6 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
return nil
}

func (r *Transformer) matchIndicatesFirst() bool {
return r.matchFirstLine
}

func (r *Transformer) matchIndicatesLast() bool {
return !r.matchFirstLine
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) {
batch, ok := r.batchMap[source]
Expand Down Expand Up @@ -303,7 +293,6 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str
r.Errorf("there was error flushing combined logs %s", err)
}
}

}

// flushAllSources flushes all sources.
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestTransformer(t *testing.T) {
cfg.IsFirstEntry = "$body == 'test1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand All @@ -178,7 +178,7 @@ func TestTransformer(t *testing.T) {
cfg.IsFirstEntry = "body == 'start'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "oldest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand Down Expand Up @@ -219,8 +219,8 @@ func TestTransformer(t *testing.T) {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = `body matches "^[^\\s]"`
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand Down Expand Up @@ -252,8 +252,8 @@ func TestTransformer(t *testing.T) {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField("message")
cfg.IsFirstEntry = `body.message matches "^[^\\s]"`
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand Down Expand Up @@ -287,7 +287,6 @@ func TestTransformer(t *testing.T) {
cfg.CombineWith = ""
cfg.IsLastEntry = "body.logtag == 'F'"
cfg.OverwriteWith = "oldest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
Expand Down Expand Up @@ -501,17 +500,18 @@ func TestTransformer(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
op, err := tc.config.Build(testutil.Logger(t))
require.NoError(t, err)
require.NoError(t, op.Start(testutil.NewUnscopedMockPersister()))
recombine := op.(*Transformer)
r := op.(*Transformer)

fake := testutil.NewFakeOutput(t)
err = recombine.SetOutputs([]operator.Operator{fake})
err = r.SetOutputs([]operator.Operator{fake})
require.NoError(t, err)

for _, e := range tc.input {
require.NoError(t, recombine.Process(context.Background(), e))
require.NoError(t, r.Process(ctx, e))
}

fake.ExpectEntries(t, tc.expectedOutput)
Expand Down

0 comments on commit 7ae9094

Please sign in to comment.