diff --git a/CHANGELOG.md b/CHANGELOG.md index 51c6e6608..05a0f0561 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.13.5] - Unreleased +## [0.13.5] - 2020-12-09 ### Fixed +- Issue where flushers would retry indefinitely - Issue where flushers would improperly reuse the same http request multiple times ## [0.13.4] - 2020-12-07 diff --git a/operator/flusher/flusher.go b/operator/flusher/flusher.go index 953431f42..c2ce969c8 100644 --- a/operator/flusher/flusher.go +++ b/operator/flusher/flusher.go @@ -11,6 +11,10 @@ import ( "golang.org/x/sync/semaphore" ) +// These are vars so they can be overridden in tests +var maxRetryInterval = time.Minute +var maxElapsedTime = time.Hour + // Config holds the configuration to build a new flusher type Config struct { // MaxConcurrent is the maximum number of goroutines flushing entries concurrently. @@ -55,10 +59,10 @@ type Flusher struct { *zap.SugaredLogger } +// FlushFunc is any function that flushes type FlushFunc func(context.Context) error -// TODO error enumeration - +// Do executes the flusher function in a goroutine func (f *Flusher) Do(flush FlushFunc) { // Wait until we have free flusher goroutines if err := f.sem.Acquire(f.ctx, 1); err != nil { @@ -127,8 +131,8 @@ func newExponentialBackoff() *backoff.ExponentialBackOff { InitialInterval: 50 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, - MaxInterval: 10 * time.Minute, - MaxElapsedTime: time.Duration(0), + MaxInterval: maxRetryInterval, + MaxElapsedTime: maxElapsedTime, Stop: backoff.Stop, Clock: backoff.SystemClock, } diff --git a/operator/flusher/flusher_test.go b/operator/flusher/flusher_test.go index f43a63079..c371da389 100644 --- a/operator/flusher/flusher_test.go +++ b/operator/flusher/flusher_test.go @@ -12,6 +12,10 @@ import ( ) func TestFlusher(t *testing.T) { + + // Override setting for test + maxElapsedTime = 5 * time.Second + outChan := make(chan struct{}, 100) flusherCfg := NewConfig() flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar()) @@ -30,9 +34,24 @@ func TestFlusher(t *testing.T) { for i := 0; i < 100; i++ { select { - case <-time.After(time.Second): + case <-time.After(5 * time.Second): require.FailNow(t, "timed out") case <-outChan: } } } + +func TestMaxElapsedTime(t *testing.T) { + + // Override setting for test + maxElapsedTime = 100 * time.Millisecond + + flusherCfg := NewConfig() + flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar()) + + start := time.Now() + flusher.flushWithRetry(context.Background(), func(_ context.Context) error { + return errors.New("never flushes") + }) + require.WithinDuration(t, start.Add(maxElapsedTime), time.Now(), maxElapsedTime) +}