Skip to content

Commit

Permalink
Flusher Timeout (#231)
Browse files Browse the repository at this point in the history
* Set a finite flusher timeout
  • Loading branch information
djaglowski authored Dec 10, 2020
1 parent 8ed70f8 commit e91ef0c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.13.5] - Unreleased
## [0.13.5] - 2020-12-09
### Fixed
- Issue where flushers would retry indefinitely
- Issue where flushers would improperly reuse the same http request multiple times

## [0.13.4] - 2020-12-07
Expand Down
12 changes: 8 additions & 4 deletions operator/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"golang.org/x/sync/semaphore"
)

// These are vars so they can be overridden in tests
var maxRetryInterval = time.Minute
var maxElapsedTime = time.Hour

// Config holds the configuration to build a new flusher
type Config struct {
// MaxConcurrent is the maximum number of goroutines flushing entries concurrently.
Expand Down Expand Up @@ -55,10 +59,10 @@ type Flusher struct {
*zap.SugaredLogger
}

// FlushFunc is any function that flushes
type FlushFunc func(context.Context) error

// TODO error enumeration

// Do executes the flusher function in a goroutine
func (f *Flusher) Do(flush FlushFunc) {
// Wait until we have free flusher goroutines
if err := f.sem.Acquire(f.ctx, 1); err != nil {
Expand Down Expand Up @@ -127,8 +131,8 @@ func newExponentialBackoff() *backoff.ExponentialBackOff {
InitialInterval: 50 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Minute,
MaxElapsedTime: time.Duration(0),
MaxInterval: maxRetryInterval,
MaxElapsedTime: maxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
Expand Down
21 changes: 20 additions & 1 deletion operator/flusher/flusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
)

func TestFlusher(t *testing.T) {

// Override setting for test
maxElapsedTime = 5 * time.Second

outChan := make(chan struct{}, 100)
flusherCfg := NewConfig()
flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar())
Expand All @@ -30,9 +34,24 @@ func TestFlusher(t *testing.T) {

for i := 0; i < 100; i++ {
select {
case <-time.After(time.Second):
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out")
case <-outChan:
}
}
}

func TestMaxElapsedTime(t *testing.T) {

// Override setting for test
maxElapsedTime = 100 * time.Millisecond

flusherCfg := NewConfig()
flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar())

start := time.Now()
flusher.flushWithRetry(context.Background(), func(_ context.Context) error {
return errors.New("never flushes")
})
require.WithinDuration(t, start.Add(maxElapsedTime), time.Now(), maxElapsedTime)
}

0 comments on commit e91ef0c

Please sign in to comment.