Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] Various code cleanup #30784

Merged
merged 10 commits into from
Feb 12, 2024
17 changes: 3 additions & 14 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (r *Transformer) Stop() error {
r.flushAllSources(ctx)

close(r.chClose)

return nil
}

Expand Down Expand Up @@ -239,18 +238,17 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {

switch {
// This is the first entry in the next batch
case matches && r.matchIndicatesFirst():
case matches && r.matchFirstLine:
// Flush the existing batch
err := r.flushSource(ctx, s)
if err != nil {
if err := r.flushSource(ctx, s); err != nil {
return err
}

// Add the current log to the new batch
r.addToBatch(ctx, e, s)
return nil
// This is the last entry in a complete batch
case matches && r.matchIndicatesLast():
case matches && !r.matchFirstLine:
r.addToBatch(ctx, e, s)
return r.flushSource(ctx, s)
}
Expand All @@ -261,14 +259,6 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
return nil
}

func (r *Transformer) matchIndicatesFirst() bool {
return r.matchFirstLine
}

func (r *Transformer) matchIndicatesLast() bool {
return !r.matchFirstLine
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) {
batch, ok := r.batchMap[source]
Expand Down Expand Up @@ -303,7 +293,6 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str
r.Errorf("there was error flushing combined logs %s", err)
}
}

}

// flushAllSources flushes all sources.
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestTransformer(t *testing.T) {
cfg.IsFirstEntry = "$body == 'test1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand All @@ -178,7 +178,7 @@ func TestTransformer(t *testing.T) {
cfg.IsFirstEntry = "body == 'start'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "oldest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand Down Expand Up @@ -219,8 +219,8 @@ func TestTransformer(t *testing.T) {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = `body matches "^[^\\s]"`
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand Down Expand Up @@ -252,8 +252,8 @@ func TestTransformer(t *testing.T) {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField("message")
cfg.IsFirstEntry = `body.message matches "^[^\\s]"`
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 10 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand Down Expand Up @@ -287,7 +287,6 @@ func TestTransformer(t *testing.T) {
cfg.CombineWith = ""
cfg.IsLastEntry = "body.logtag == 'F'"
cfg.OverwriteWith = "oldest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
Expand Down Expand Up @@ -501,18 +500,19 @@ func TestTransformer(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
op, err := tc.config.Build(testutil.Logger(t))
require.NoError(t, err)
require.NoError(t, op.Start(testutil.NewUnscopedMockPersister()))
defer func() { require.NoError(t, op.Stop()) }()
recombine := op.(*Transformer)
r := op.(*Transformer)

fake := testutil.NewFakeOutput(t)
err = recombine.SetOutputs([]operator.Operator{fake})
err = r.SetOutputs([]operator.Operator{fake})
require.NoError(t, err)

for _, e := range tc.input {
require.NoError(t, recombine.Process(context.Background(), e))
require.NoError(t, r.Process(ctx, e))
}

fake.ExpectEntries(t, tc.expectedOutput)
Expand Down
Loading