diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7be68d706807..16ebbe39f994 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -90,6 +90,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Enforce validation for the Central Management access token. {issue}9621[9621] - Fix config appender registration. {pull}9873[9873] - Gracefully handle TLS options when enrolling a Beat. {issue}9129[9129] +- The backing off now implements jitter to better distribute the load. {issue}10172[10172] *Auditbeat* diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 9df403ef0958..904ee64d927f 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/logp" ) @@ -44,7 +45,7 @@ type Reader struct { config Config done chan struct{} logger *logp.Logger - backoff *common.Backoff + backoff backoff.Backoff } // New creates a new journal reader and moves the FP to the configured position. @@ -98,7 +99,7 @@ func newReader(logger *logp.Logger, done chan struct{}, c Config, journal *sdjou config: c, done: done, logger: logger, - backoff: common.NewBackoff(done, c.Backoff, c.MaxBackoff), + backoff: backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff), } r.seek(state.Cursor) diff --git a/libbeat/common/backoff/backoff.go b/libbeat/common/backoff/backoff.go new file mode 100644 index 000000000000..a027fb51a461 --- /dev/null +++ b/libbeat/common/backoff/backoff.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package backoff + +// Backoff defines the interface for backoff strategies. +type Backoff interface { + Wait() bool + Reset() +} + +// WaitOnError is a convenience method, if an error is received it will block, if not errors is +// received, the backoff will be resetted. +func WaitOnError(b Backoff, err error) bool { + if err == nil { + b.Reset() + return true + } + return b.Wait() +} diff --git a/libbeat/common/backoff/backoff_test.go b/libbeat/common/backoff/backoff_test.go new file mode 100644 index 000000000000..1fca197e57d2 --- /dev/null +++ b/libbeat/common/backoff/backoff_test.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package backoff + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type factory func(<-chan struct{}) Backoff + +func TestBackoff(t *testing.T) { + t.Run("test close channel", testCloseChannel) + t.Run("test unblock after some time", testUnblockAfterInit) +} + +func testCloseChannel(t *testing.T) { + init := 2 * time.Second + max := 5 * time.Minute + + tests := map[string]factory{ + "ExpBackoff": func(done <-chan struct{}) Backoff { + return NewExpBackoff(done, init, max) + }, + "EqualJitterBackoff": func(done <-chan struct{}) Backoff { + return NewEqualJitterBackoff(done, init, max) + }, + } + + for name, f := range tests { + t.Run(name, func(t *testing.T) { + c := make(chan struct{}) + b := f(c) + close(c) + assert.False(t, b.Wait()) + }) + } +} + +func testUnblockAfterInit(t *testing.T) { + init := 1 * time.Second + max := 5 * time.Minute + + tests := map[string]factory{ + "ExpBackoff": func(done <-chan struct{}) Backoff { + return NewExpBackoff(done, init, max) + }, + "EqualJitterBackoff": func(done <-chan struct{}) Backoff { + return NewEqualJitterBackoff(done, init, max) + }, + } + + for name, f := range tests { + t.Run(name, func(t *testing.T) { + c := make(chan struct{}) + defer close(c) + + b := f(c) + + startedAt := time.Now() + assert.True(t, WaitOnError(b, errors.New("bad bad"))) + assert.True(t, time.Now().Sub(startedAt) >= init) + }) + } +} diff --git a/libbeat/common/backoff/equal_jitter.go b/libbeat/common/backoff/equal_jitter.go new file mode 100644 index 000000000000..ff5c86f156fb --- /dev/null +++ b/libbeat/common/backoff/equal_jitter.go @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package backoff + +import ( + "math/rand" + "time" +) + +// EqualJitterBackoff implements an equal jitter strategy, meaning the wait time will consist of two parts, +// the first will be exponential and the other half will be random and will provide the jitter +// necessary to distribute the wait on remote endpoint. +type EqualJitterBackoff struct { + duration time.Duration + done <-chan struct{} + + init time.Duration + max time.Duration + + last time.Time +} + +// NewEqualJitterBackoff returns a new EqualJitter object. +func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backoff { + return &EqualJitterBackoff{ + duration: init * 2, // Allow to sleep at least the init period on the first wait. + done: done, + init: init, + max: max, + } +} + +// Reset resets the duration of the backoff. +func (b *EqualJitterBackoff) Reset() { + // Allow to sleep at least the init period on the first wait. + b.duration = b.init * 2 +} + +// Wait block until either the timer is completed or channel is done. +func (b *EqualJitterBackoff) Wait() bool { + // Make sure we have always some minimal back off and jitter. + temp := int64(b.duration / 2) + backoff := time.Duration(temp + rand.Int63n(temp)) + + // increase duration for next wait. + b.duration *= 2 + if b.duration > b.max { + b.duration = b.max + } + + select { + case <-b.done: + return false + case <-time.After(backoff): + b.last = time.Now() + return true + } +} diff --git a/libbeat/common/backoff.go b/libbeat/common/backoff/exponential.go similarity index 64% rename from libbeat/common/backoff.go rename to libbeat/common/backoff/exponential.go index 575bb333e6bd..101e66f6e74b 100644 --- a/libbeat/common/backoff.go +++ b/libbeat/common/backoff/exponential.go @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -package common +package backoff -import "time" +import ( + "time" +) -// A Backoff waits on errors with exponential backoff (limited by maximum -// backoff). Resetting Backoff will reset the next sleep timer to the initial -// backoff duration. -type Backoff struct { +// ExpBackoff exponential backoff, will wait an initial time and exponentialy +// increases the wait time up to a predefined maximun. Resetting Backoff will reset the next sleep +// timer to the initial backoff duration. +type ExpBackoff struct { duration time.Duration done <-chan struct{} @@ -32,8 +34,9 @@ type Backoff struct { last time.Time } -func NewBackoff(done <-chan struct{}, init, max time.Duration) *Backoff { - return &Backoff{ +// NewExpBackoff returns a new exponential backoff. +func NewExpBackoff(done <-chan struct{}, init, max time.Duration) Backoff { + return &ExpBackoff{ duration: init, done: done, init: init, @@ -41,11 +44,13 @@ func NewBackoff(done <-chan struct{}, init, max time.Duration) *Backoff { } } -func (b *Backoff) Reset() { +// Reset resets the duration of the backoff. +func (b *ExpBackoff) Reset() { b.duration = b.init } -func (b *Backoff) Wait() bool { +// Wait block until either the timer is completed or channel is done. +func (b *ExpBackoff) Wait() bool { backoff := b.duration b.duration *= 2 if b.duration > b.max { @@ -60,24 +65,3 @@ func (b *Backoff) Wait() bool { return true } } - -func (b *Backoff) WaitOnError(err error) bool { - if err == nil { - b.Reset() - return true - } - return b.Wait() -} - -func (b *Backoff) TryWaitOnError(failTS time.Time, err error) bool { - if err == nil { - b.Reset() - return true - } - - if failTS.Before(b.last) { - return true - } - - return b.Wait() -} diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 1484bc42c001..dca02fc88fc5 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -21,7 +21,7 @@ import ( "errors" "time" - "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/testing" ) @@ -30,13 +30,13 @@ type backoffClient struct { client NetworkClient done chan struct{} - backoff *common.Backoff + backoff backoff.Backoff } // WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed. func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { done := make(chan struct{}) - backoff := common.NewBackoff(done, init, max) + backoff := backoff.NewEqualJitterBackoff(done, init, max) return &backoffClient{ client: client, done: done, @@ -46,7 +46,7 @@ func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { func (b *backoffClient) Connect() error { err := b.client.Connect() - b.backoff.WaitOnError(err) + backoff.WaitOnError(b.backoff, err) return err } @@ -61,7 +61,7 @@ func (b *backoffClient) Publish(batch publisher.Batch) error { if err != nil { b.client.Close() } - b.backoff.WaitOnError(err) + backoff.WaitOnError(b.backoff, err) return err } diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index 7c63574d92fd..3084522f6af7 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -22,7 +22,7 @@ import ( "github.com/garyburd/redigo/redis" - "github.com/elastic/beats/libbeat/common" + b "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/publisher" ) @@ -32,7 +32,7 @@ type backoffClient struct { reason failReason done chan struct{} - backoff *common.Backoff + backoff b.Backoff } // failReason is used to track the cause of an error. @@ -51,7 +51,7 @@ const ( func newBackoffClient(client *client, init, max time.Duration) *backoffClient { done := make(chan struct{}) - backoff := common.NewBackoff(done, init, max) + backoff := b.NewEqualJitterBackoff(done, init, max) return &backoffClient{ client: client, done: done, diff --git a/x-pack/functionbeat/licenser/manager.go b/x-pack/functionbeat/licenser/manager.go index 6d1b08dc8e52..632d988a399f 100644 --- a/x-pack/functionbeat/licenser/manager.go +++ b/x-pack/functionbeat/licenser/manager.go @@ -14,7 +14,7 @@ import ( "github.com/gofrs/uuid" - "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/logp" ) @@ -253,7 +253,7 @@ func (m *Manager) worker() { } func (m *Manager) update() { - backoff := common.NewBackoff(m.done, initBackoff, maxBackoff) + backoff := backoff.NewEqualJitterBackoff(m.done, initBackoff, maxBackoff) startedAt := time.Now() for { select {