Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatch: don't reset timer if flush is in-progress #1301

Merged
merged 1 commit into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ type aggrGroup struct {
next *time.Timer
timeout func(time.Duration) time.Duration

mtx sync.RWMutex
alerts map[model.Fingerprint]*types.Alert
hasSent bool
mtx sync.RWMutex
alerts map[model.Fingerprint]*types.Alert
hasFlushed bool
}

// newAggrGroup returns a new aggregation group.
Expand Down Expand Up @@ -366,6 +366,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
Expand Down Expand Up @@ -396,7 +397,7 @@ func (ag *aggrGroup) insert(alert *types.Alert) {

// Immediately trigger a flush if the wait duration for this
// alert is already over.
if !ag.hasSent && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)
}
}
Expand Down Expand Up @@ -457,8 +458,6 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
delete(ag.alerts, fp)
}
}

ag.hasSent = true
ag.mtx.Unlock()
}
}
7 changes: 3 additions & 4 deletions test/acceptance/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,9 @@ receivers:
func TestReload(t *testing.T) {
t.Parallel()

// We create a notification config that fans out into two different
// webhooks.
// The succeeding one must still only receive the first successful
// notifications. Sending to the succeeding one must eventually succeed.
// This integration test ensures that the first alert isn't notified twice
// and repeat_interval applies after the AlertManager process has been
// reloaded.
conf := `
route:
receiver: "default"
Expand Down