From 8aa22471f278ca1aeda01657cba7a9b3883eb0e9 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 7 Dec 2020 21:36:56 -0500 Subject: [PATCH 1/3] Added test for flusher timeout --- operator/flusher/flusher.go | 12 ++++++++---- operator/flusher/flusher_test.go | 21 ++++++++++++++++++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/operator/flusher/flusher.go b/operator/flusher/flusher.go index 953431f42..c19c3ccf7 100644 --- a/operator/flusher/flusher.go +++ b/operator/flusher/flusher.go @@ -11,6 +11,10 @@ import ( "golang.org/x/sync/semaphore" ) +// These are vars so they can be overridden in tests +var maxRetryInterval = time.Second +var maxElapsedTime = 5 * time.Second + // Config holds the configuration to build a new flusher type Config struct { // MaxConcurrent is the maximum number of goroutines flushing entries concurrently. @@ -55,10 +59,10 @@ type Flusher struct { *zap.SugaredLogger } +// FlushFunc is any function that flushes type FlushFunc func(context.Context) error -// TODO error enumeration - +// Do executes the flusher function in a goroutine func (f *Flusher) Do(flush FlushFunc) { // Wait until we have free flusher goroutines if err := f.sem.Acquire(f.ctx, 1); err != nil { @@ -127,8 +131,8 @@ func newExponentialBackoff() *backoff.ExponentialBackOff { InitialInterval: 50 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, - MaxInterval: 10 * time.Minute, - MaxElapsedTime: time.Duration(0), + MaxInterval: maxRetryInterval, + MaxElapsedTime: maxElapsedTime, Stop: backoff.Stop, Clock: backoff.SystemClock, } diff --git a/operator/flusher/flusher_test.go b/operator/flusher/flusher_test.go index f43a63079..c371da389 100644 --- a/operator/flusher/flusher_test.go +++ b/operator/flusher/flusher_test.go @@ -12,6 +12,10 @@ import ( ) func TestFlusher(t *testing.T) { + + // Override setting for test + maxElapsedTime = 5 * time.Second + outChan := make(chan struct{}, 100) flusherCfg := NewConfig() flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar()) @@ -30,9 +34,24 @@ func TestFlusher(t *testing.T) { for i := 0; i < 100; i++ { select { - case <-time.After(time.Second): + case <-time.After(5 * time.Second): require.FailNow(t, "timed out") case <-outChan: } } } + +func TestMaxElapsedTime(t *testing.T) { + + // Override setting for test + maxElapsedTime = 100 * time.Millisecond + + flusherCfg := NewConfig() + flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar()) + + start := time.Now() + flusher.flushWithRetry(context.Background(), func(_ context.Context) error { + return errors.New("never flushes") + }) + require.WithinDuration(t, start.Add(maxElapsedTime), time.Now(), maxElapsedTime) +} From 1cc9c71ae45d26d94df9cc4404140bb67fc5eb82 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 8 Dec 2020 08:59:35 -0500 Subject: [PATCH 2/3] Updated changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e81dd22fe..23f7c86ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.13.5] - 2020-12-08 +### Fixed +- Issue where flushers would retry indefinitely + ## [0.13.4] - 2020-12-07 ### Added - Recombine operator to combine multiline logs after ingestion and parsing From b0b31fb4ae60e06dc487d487b67e808e9e8a9339 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 8 Dec 2020 11:01:58 -0500 Subject: [PATCH 3/3] Extend flusher timeout to one hour --- operator/flusher/flusher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator/flusher/flusher.go b/operator/flusher/flusher.go index c19c3ccf7..c2ce969c8 100644 --- a/operator/flusher/flusher.go +++ b/operator/flusher/flusher.go @@ -12,8 +12,8 @@ import ( ) // These are vars so they can be overridden in tests -var maxRetryInterval = time.Second -var maxElapsedTime = 5 * time.Second +var maxRetryInterval = time.Minute +var maxElapsedTime = time.Hour // Config holds the configuration to build a new flusher type Config struct {