From c9cbaa3a039e359bd45b31479d322dea1851024f Mon Sep 17 00:00:00 2001 From: yutingcaicyt Date: Thu, 30 Mar 2023 16:11:44 +0800 Subject: [PATCH] [pkg/stanza] fix recombine operator timeout issue (#20451) When using recombine operator, the behavior is different with description for force_flush_period. Fixing the issue that the actual timeout is much longer than force_flush_period. In order to make the actual timeout closer to "force_flush_period", set the period of ticker to 1/5 "force_flush_period", so that the entries will be forced to be sent after waiting for at most 6/5 "force_flush_period" --- .chloggen/fixrecombinetimeout.yaml | 15 ++++ .../transformer/recombine/recombine.go | 10 +-- .../transformer/recombine/recombine_test.go | 90 +++++++++++++++++++ 3 files changed, 110 insertions(+), 5 deletions(-) create mode 100755 .chloggen/fixrecombinetimeout.yaml diff --git a/.chloggen/fixrecombinetimeout.yaml b/.chloggen/fixrecombinetimeout.yaml new file mode 100755 index 000000000000..4118f2305c60 --- /dev/null +++ b/.chloggen/fixrecombinetimeout.yaml @@ -0,0 +1,15 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where recombine operator would never flush. +# One or more tracking issues related to the change +issues: [20451] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: 1.Make the forceFlushTimeout compare with timeSinceFirstEntry not timeSinceLastEntry 2.set the period of ticker to 1/5 forceFlushTimeout diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 5ad0aa162f20..7f6f3d3a388c 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -176,17 +176,17 @@ func (r *Transformer) flushLoop() { timeNow := time.Now() for source, batch := range r.batchMap { entries := batch.entries - lastEntryTs := entries[len(entries)-1].ObservedTimestamp - timeSinceLastEntry := timeNow.Sub(lastEntryTs) - if timeSinceLastEntry < r.forceFlushTimeout { + firstEntryTs := entries[0].ObservedTimestamp + timeSinceFirstEntry := timeNow.Sub(firstEntryTs) + if timeSinceFirstEntry < r.forceFlushTimeout { continue } if err := r.flushSource(source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } } - - r.ticker.Reset(r.forceFlushTimeout) + // check every 1/5 forceFlushTimeout + r.ticker.Reset(r.forceFlushTimeout / 5) r.Unlock() case <-r.chClose: r.ticker.Stop() diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index d3db4d5c4815..11f58f020b13 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -466,3 +466,93 @@ func TestTimeout(t *testing.T) { require.NoError(t, recombine.Stop()) } + +// This test is to make sure the timeout would take effect when there +// are constantly logs that meet the aggregation criteria +func TestTimeoutWhenAggregationKeepHappen(t *testing.T) { + t.Parallel() + + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "body == 'start'" + cfg.CombineWith = "" + cfg.OutputIDs = []string{"fake"} + cfg.ForceFlushTimeout = 100 * time.Millisecond + op, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) + recombine := op.(*Transformer) + + fake := testutil.NewFakeOutput(t) + require.NoError(t, recombine.SetOutputs([]operator.Operator{fake})) + + e := entry.New() + e.Timestamp = time.Now() + e.Body = "start" + + ctx := context.Background() + + require.NoError(t, recombine.Start(nil)) + + go func() { + require.NoError(t, recombine.Process(ctx, e)) + next := e.Copy() + next.Body = "next" + for { + time.Sleep(cfg.ForceFlushTimeout / 2) + require.NoError(t, recombine.Process(ctx, next)) + } + }() + select { + case aggregation := <-fake.Received: + require.Equal(t, "startnext", aggregation.Body) + case <-time.After(200 * time.Millisecond): + t.Logf("The entry should be flushed by now") + t.FailNow() + } + require.NoError(t, recombine.Stop()) +} + +// This test is to make sure the max aggregation = the period(1/5 forceFlushTimeout) of ticker + forceFlushTimeout +func TestMaxAggregationTime(t *testing.T) { + t.Parallel() + + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "body == 'start'" + cfg.CombineWith = "" + cfg.OutputIDs = []string{"fake"} + cfg.ForceFlushTimeout = 100 * time.Millisecond + op, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) + recombine := op.(*Transformer) + + fake := testutil.NewFakeOutput(t) + require.NoError(t, recombine.SetOutputs([]operator.Operator{fake})) + + e := entry.New() + e.Timestamp = time.Now() + e.Body = "start" + + ctx := context.Background() + + require.NoError(t, recombine.Start(nil)) + go func() { + // the period of ticker in flushLoop is 1/5 * forceFlushTimeout = 20 Milliseconds + // make the moment data enter the batch that close to the end of the previous tick + time.Sleep(15 * time.Millisecond) + require.NoError(t, recombine.Process(ctx, e)) + next := e.Copy() + next.Body = "next" + for { + require.NoError(t, recombine.Process(ctx, next)) + time.Sleep(cfg.ForceFlushTimeout / 2) + } + }() + select { + case <-fake.Received: + case <-time.After(cfg.ForceFlushTimeout * 6 / 5): + t.Logf("The entry should be flushed by now") + t.FailNow() + } + require.NoError(t, recombine.Stop()) +}