Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flusher Timeout #231

Merged
merged 5 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.13.5] - Unreleased
## [0.13.5] - 2020-12-09
### Fixed
- Issue where flushers would retry indefinitely
- Issue where flushers would improperly reuse the same http request multiple times

## [0.13.4] - 2020-12-07
Expand Down
12 changes: 8 additions & 4 deletions operator/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"golang.org/x/sync/semaphore"
)

// These are vars so they can be overridden in tests
var maxRetryInterval = time.Minute
var maxElapsedTime = time.Hour

// Config holds the configuration to build a new flusher
type Config struct {
// MaxConcurrent is the maximum number of goroutines flushing entries concurrently.
Expand Down Expand Up @@ -55,10 +59,10 @@ type Flusher struct {
*zap.SugaredLogger
}

// FlushFunc is any function that flushes
type FlushFunc func(context.Context) error

// TODO error enumeration

// Do executes the flusher function in a goroutine
func (f *Flusher) Do(flush FlushFunc) {
// Wait until we have free flusher goroutines
if err := f.sem.Acquire(f.ctx, 1); err != nil {
Expand Down Expand Up @@ -127,8 +131,8 @@ func newExponentialBackoff() *backoff.ExponentialBackOff {
InitialInterval: 50 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Minute,
MaxElapsedTime: time.Duration(0),
MaxInterval: maxRetryInterval,
MaxElapsedTime: maxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
Expand Down
21 changes: 20 additions & 1 deletion operator/flusher/flusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
)

func TestFlusher(t *testing.T) {

// Override setting for test
maxElapsedTime = 5 * time.Second

outChan := make(chan struct{}, 100)
flusherCfg := NewConfig()
flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar())
Expand All @@ -30,9 +34,24 @@ func TestFlusher(t *testing.T) {

for i := 0; i < 100; i++ {
select {
case <-time.After(time.Second):
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out")
case <-outChan:
}
}
}

func TestMaxElapsedTime(t *testing.T) {

// Override setting for test
maxElapsedTime = 100 * time.Millisecond

flusherCfg := NewConfig()
flusher := flusherCfg.Build(zaptest.NewLogger(t).Sugar())

start := time.Now()
flusher.flushWithRetry(context.Background(), func(_ context.Context) error {
return errors.New("never flushes")
})
require.WithinDuration(t, start.Add(maxElapsedTime), time.Now(), maxElapsedTime)
}