Skip to content

Commit

Permalink
New Backoff strategies and types
Browse files Browse the repository at this point in the history
This PR add a new interface called backoff.Backoff, this can be used to
generalize any backoff interaction. It move the current Backoff strategy
under an ExpBackoff type.

ExpBackoff times is the same as before on every wait we just
exponentially increase the duration of the wait and sleep for that
amount.

EqualJitterBackoff uses an exponential increment of the duration but
will take half of that value as a fixed sleep time and the other half
as a jitter. This will help distribute the new request when a cluster is
done instead of having all the beats trying to reconnect at once.

The Redis implementations and any clients wrapped with a backoff will
now use the EqualJitterBackoff, any other code will keep using the same
exponential strategy.

Fixes: #10172
  • Loading branch information
ph committed Jan 21, 2019
1 parent d53fa9f commit 5d7864b
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Enforce validation for the Central Management access token. {issue}9621[9621]
- Fix config appender registration. {pull}9873[9873]
- Gracefully handle TLS options when enrolling a Beat. {issue}9129[9129]
- The backing off now implements jitter to better distribute the load. {issue}10172[10172]

*Auditbeat*

Expand Down
5 changes: 3 additions & 2 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/logp"
)

Expand All @@ -44,7 +45,7 @@ type Reader struct {
config Config
done chan struct{}
logger *logp.Logger
backoff *common.Backoff
backoff backoff.Backoff
}

// New creates a new journal reader and moves the FP to the configured position.
Expand Down Expand Up @@ -98,7 +99,7 @@ func newReader(logger *logp.Logger, done chan struct{}, c Config, journal *sdjou
config: c,
done: done,
logger: logger,
backoff: common.NewBackoff(done, c.Backoff, c.MaxBackoff),
backoff: backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff),
}
r.seek(state.Cursor)

Expand Down
34 changes: 34 additions & 0 deletions libbeat/common/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backoff

// Backoff defines the interface for backoff strategies.
type Backoff interface {
Wait() bool
Reset()
}

// WaitOnError is a convenience method, if an error is received it will block, if not errors is
// received, the backoff will be resetted.
func WaitOnError(b Backoff, err error) bool {
if err == nil {
b.Reset()
return true
}
return b.Wait()
}
83 changes: 83 additions & 0 deletions libbeat/common/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backoff

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type factory func(<-chan struct{}) Backoff

func TestBackoff(t *testing.T) {
t.Run("test close channel", testCloseChannel)
t.Run("test unblock after some time", testUnblockAfterInit)
}

func testCloseChannel(t *testing.T) {
init := 2 * time.Second
max := 5 * time.Minute

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
b := f(c)
close(c)
assert.False(t, b.Wait())
})
}
}

func testUnblockAfterInit(t *testing.T) {
init := 1 * time.Second
max := 5 * time.Minute

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
defer close(c)

b := f(c)

startedAt := time.Now()
assert.True(t, WaitOnError(b, errors.New("bad bad")))
assert.True(t, time.Now().Sub(startedAt) >= init)
})
}
}
73 changes: 73 additions & 0 deletions libbeat/common/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backoff

import (
"math/rand"
"time"
)

// EqualJitterBackoff implements an equal jitter strategy, meaning the wait time will consist of two parts,
// the first will be exponential and the other half will be random and will provide the jitter
// necessary to distribute the wait on remote endpoint.
type EqualJitterBackoff struct {
duration time.Duration
done <-chan struct{}

init time.Duration
max time.Duration

last time.Time
}

// NewEqualJitterBackoff returns a new EqualJitter object.
func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backoff {
return &EqualJitterBackoff{
duration: init * 2, // Allow to sleep at least the init period on the first wait.
done: done,
init: init,
max: max,
}
}

// Reset resets the duration of the backoff.
func (b *EqualJitterBackoff) Reset() {
// Allow to sleep at least the init period on the first wait.
b.duration = b.init * 2
}

// Wait block until either the timer is completed or channel is done.
func (b *EqualJitterBackoff) Wait() bool {
// Make sure we have always some minimal back off and jitter.
temp := int64(b.duration / 2)
backoff := time.Duration(temp + rand.Int63n(temp))

// increase duration for next wait.
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
}

select {
case <-b.done:
return false
case <-time.After(backoff):
b.last = time.Now()
return true
}
}
46 changes: 15 additions & 31 deletions libbeat/common/backoff.go → libbeat/common/backoff/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.

package common
package backoff

import "time"
import (
"time"
)

// A Backoff waits on errors with exponential backoff (limited by maximum
// backoff). Resetting Backoff will reset the next sleep timer to the initial
// backoff duration.
type Backoff struct {
// ExpBackoff exponential backoff, will wait an initial time and exponentialy
// increases the wait time up to a predefined maximun. Resetting Backoff will reset the next sleep
// timer to the initial backoff duration.
type ExpBackoff struct {
duration time.Duration
done <-chan struct{}

Expand All @@ -32,20 +34,23 @@ type Backoff struct {
last time.Time
}

func NewBackoff(done <-chan struct{}, init, max time.Duration) *Backoff {
return &Backoff{
// NewExpBackoff returns a new exponential backoff.
func NewExpBackoff(done <-chan struct{}, init, max time.Duration) Backoff {
return &ExpBackoff{
duration: init,
done: done,
init: init,
max: max,
}
}

func (b *Backoff) Reset() {
// Reset resets the duration of the backoff.
func (b *ExpBackoff) Reset() {
b.duration = b.init
}

func (b *Backoff) Wait() bool {
// Wait block until either the timer is completed or channel is done.
func (b *ExpBackoff) Wait() bool {
backoff := b.duration
b.duration *= 2
if b.duration > b.max {
Expand All @@ -60,24 +65,3 @@ func (b *Backoff) Wait() bool {
return true
}
}

func (b *Backoff) WaitOnError(err error) bool {
if err == nil {
b.Reset()
return true
}
return b.Wait()
}

func (b *Backoff) TryWaitOnError(failTS time.Time, err error) bool {
if err == nil {
b.Reset()
return true
}

if failTS.Before(b.last) {
return true
}

return b.Wait()
}
10 changes: 5 additions & 5 deletions libbeat/outputs/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/testing"
)
Expand All @@ -30,13 +30,13 @@ type backoffClient struct {
client NetworkClient

done chan struct{}
backoff *common.Backoff
backoff backoff.Backoff
}

// WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed.
func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient {
done := make(chan struct{})
backoff := common.NewBackoff(done, init, max)
backoff := backoff.NewEqualJitterBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
Expand All @@ -46,7 +46,7 @@ func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient {

func (b *backoffClient) Connect() error {
err := b.client.Connect()
b.backoff.WaitOnError(err)
backoff.WaitOnError(b.backoff, err)
return err
}

Expand All @@ -61,7 +61,7 @@ func (b *backoffClient) Publish(batch publisher.Batch) error {
if err != nil {
b.client.Close()
}
b.backoff.WaitOnError(err)
backoff.WaitOnError(b.backoff, err)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/redis/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/garyburd/redigo/redis"

"github.com/elastic/beats/libbeat/common"
b "github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/publisher"
)

Expand All @@ -32,7 +32,7 @@ type backoffClient struct {
reason failReason

done chan struct{}
backoff *common.Backoff
backoff b.Backoff
}

// failReason is used to track the cause of an error.
Expand All @@ -51,7 +51,7 @@ const (

func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
done := make(chan struct{})
backoff := common.NewBackoff(done, init, max)
backoff := b.NewEqualJitterBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
Expand Down

0 comments on commit 5d7864b

Please sign in to comment.