Skip to content

Commit

Permalink
[pkg/stanza/operator/transformer/recombine] fix the flaky test(#20877) (
Browse files Browse the repository at this point in the history
#20899)

Due to the use of goroutines, it's not suitable for some accurate tests for time.
  • Loading branch information
yutingcaicyt authored Apr 24, 2023
1 parent 691044b commit 39681e6
Showing 1 changed file with 2 additions and 46 deletions.
48 changes: 2 additions & 46 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,64 +492,20 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) {
ctx := context.Background()

require.NoError(t, recombine.Start(nil))
require.NoError(t, recombine.Process(ctx, e))

go func() {
require.NoError(t, recombine.Process(ctx, e))
next := e.Copy()
next.Body = "next"
for {
time.Sleep(cfg.ForceFlushTimeout / 2)
require.NoError(t, recombine.Process(ctx, next))
}
}()
select {
case <-fake.Received:
case <-time.After(200 * time.Millisecond):
t.Logf("The entry should be flushed by now")
t.FailNow()
}
require.NoError(t, recombine.Stop())
}

// This test is to make sure the max aggregation = the period(1/5 forceFlushTimeout) of ticker + forceFlushTimeout
func TestMaxAggregationTime(t *testing.T) {
t.Parallel()

cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "body == 'start'"
cfg.CombineWith = ""
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 100 * time.Millisecond
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)
recombine := op.(*Transformer)

fake := testutil.NewFakeOutput(t)
require.NoError(t, recombine.SetOutputs([]operator.Operator{fake}))

e := entry.New()
e.Timestamp = time.Now()
e.Body = "start"

ctx := context.Background()

require.NoError(t, recombine.Start(nil))
go func() {
// the period of ticker in flushLoop is 1/5 * forceFlushTimeout = 20 Milliseconds
// make the moment data enter the batch that close to the end of the previous tick
time.Sleep(15 * time.Millisecond)
require.NoError(t, recombine.Process(ctx, e))
next := e.Copy()
next.Body = "next"
for {
require.NoError(t, recombine.Process(ctx, next))
time.Sleep(cfg.ForceFlushTimeout / 2)
}
}()
select {
case <-fake.Received:
case <-time.After(cfg.ForceFlushTimeout * 6 / 5):
case <-time.After(5 * time.Second):
t.Logf("The entry should be flushed by now")
t.FailNow()
}
Expand Down

0 comments on commit 39681e6

Please sign in to comment.