Skip to content

Commit

Permalink
[Agent] Improved cancellation of agent (#17318)
Browse files Browse the repository at this point in the history
[Agent] Improved cancellation of agent (#17318)
  • Loading branch information
michalpristas authored Apr 14, 2020
1 parent 4f6da4f commit 65b5255
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 35 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Fixed installers for SNAPSHOTs and windows {pull}17077[17077]
- Fixed merge of config {pull}17399[17399]
- Handle abs paths on windows correctly {pull}17461[17461]
- Improved cancellation of agent {pull}17318[17318]

==== New features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (ad *actionDispatcher) Dispatch(acker fleetAcker, actions ...action) error
)

for _, action := range actions {
if err := ad.ctx.Err(); err != nil {
return err
}

if err := ad.dispatchAction(action, acker); err != nil {
ad.log.Debugf("Failed to dispatch action '%+v', error: %+v", action, err)
return err
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (f *fleetGateway) worker() {

func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.backoff.Reset()
for {
for f.bgContext.Err() == nil {
// TODO: wrap with timeout context
resp, err := f.execute(f.bgContext)
if err != nil {
Expand All @@ -171,6 +171,8 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
}
return resp, nil
}

return nil, f.bgContext.Err()
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (m *Managed) Start() error {
// Stop stops a managed elastic-agent.
func (m *Managed) Stop() error {
defer m.log.Info("Agent is stopped")
m.gateway.Stop()
m.cancelCtxFn()
m.gateway.Stop()
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func (o *Operator) runFlow(p Descriptor, operations []operation) error {
}

for _, op := range operations {
if err := o.bgContext.Err(); err != nil {
return err
}

shouldRun, err := op.Check()
if err != nil {
return err
Expand Down Expand Up @@ -252,7 +256,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {

monitor := monitoring.NewMonitor(isMonitorable(p), p.BinaryName(), o.pipelineID, o.config.DownloadConfig, o.config.MonitoringConfig.MonitorLogs, o.config.MonitoringConfig.MonitorMetrics)

a, err := app.NewApplication(p.ID(), p.BinaryName(), o.pipelineID, specifier, factory, o.config, o.logger, o.eventProcessor.OnFailing, monitor)
a, err := app.NewApplication(o.bgContext, p.ID(), p.BinaryName(), o.pipelineID, specifier, factory, o.config, o.logger, o.eventProcessor.OnFailing, monitor)
if err != nil {
return nil, err
}
Expand Down
33 changes: 28 additions & 5 deletions x-pack/elastic-agent/pkg/core/plugin/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ReportFailureFunc func(context.Context, string, error)

// Application encapsulates a concrete application ran by elastic-agent e.g Beat.
type Application struct {
bgContext context.Context
id string
name string
pipelineID string
Expand Down Expand Up @@ -68,6 +69,7 @@ type ArgsDecorator func([]string) []string
// NewApplication creates a new instance of an applications. It will not automatically start
// the application.
func NewApplication(
ctx context.Context,
id, appName, pipelineID string,
spec Specifier,
factory remoteconfig.ConnectionCreator,
Expand All @@ -82,8 +84,9 @@ func NewApplication(
return nil, err
}

b, _ := tokenbucket.NewTokenBucket(3, 3, 1*time.Second)
b, _ := tokenbucket.NewTokenBucket(ctx, 3, 3, 1*time.Second)
return &Application{
bgContext: ctx,
id: id,
name: appName,
pipelineID: pipelineID,
Expand Down Expand Up @@ -150,10 +153,14 @@ func (a *Application) State() state.State {

func (a *Application) watch(ctx context.Context, proc *os.Process, cfg map[string]interface{}) {
go func() {
procState, err := proc.Wait()
if err != nil {
// process is not a child - some OSs requires process to be child
a.externalProcess(proc)
var procState *os.ProcessState

select {
case ps := <-a.waitProc(proc):
procState = ps
case <-a.bgContext.Done():
a.Stop()
return
}

a.appLock.Lock()
Expand All @@ -175,6 +182,22 @@ func (a *Application) watch(ctx context.Context, proc *os.Process, cfg map[strin
}()
}

func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
resChan := make(chan *os.ProcessState)

go func() {
procState, err := proc.Wait()
if err != nil {
// process is not a child - some OSs requires process to be child
a.externalProcess(proc)
}

resChan <- procState
}()

return resChan
}

func (a *Application) reportCrash(ctx context.Context) {
a.monitor.Cleanup()

Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/core/plugin/app/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (a *Application) Configure(ctx context.Context, config map[string]interface
return errors.New(ErrAppNotRunning)
}

retryFn := func() error {
retryFn := func(ctx context.Context) error {
a.appLock.Lock()
defer a.appLock.Unlock()

Expand Down Expand Up @@ -79,5 +79,5 @@ func (a *Application) Configure(ctx context.Context, config map[string]interface
return retry.ErrorMakeFatal(err)
}

return retry.Do(a.retryConfig, retryFn)
return retry.Do(ctx, a.retryConfig, retryFn)
}
15 changes: 12 additions & 3 deletions x-pack/elastic-agent/pkg/core/plugin/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (a *Application) waitForGrpc(spec ProcessSpec, ca *authority.CertificateAut

for round := 1; round <= rounds; round++ {
for retry := 1; retry <= retries; retry++ {
c, cancelFn := context.WithTimeout(context.Background(), retryTimeout)
c, cancelFn := context.WithTimeout(a.bgContext, retryTimeout)
err := checkFn(c, a.state.ProcessInfo.Address)
if err == nil {
cancelFn()
Expand All @@ -145,12 +145,21 @@ func (a *Application) waitForGrpc(spec ProcessSpec, ca *authority.CertificateAut

// do not wait on last
if retry != retries {
<-time.After(retryTimeout)
select {
case <-time.After(retryTimeout):
case <-a.bgContext.Done():
return nil
}
}
}

// do not wait on last
if round != rounds {
time.After(time.Duration(round) * roundsTimeout)
select {
case <-time.After(time.Duration(round) * roundsTimeout):
case <-a.bgContext.Done():
return nil
}
}
}

Expand Down
15 changes: 12 additions & 3 deletions x-pack/elastic-agent/pkg/core/plugin/retry/retrystrategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package retry

import (
"context"
"time"

"github.com/elastic/beats/v7/libbeat/common/backoff"
Expand All @@ -31,12 +32,16 @@ func DoWithBackoff(config *Config, b backoff.Backoff, fn func() error, fatalErro
}

// Do runs provided function in a manner specified in retry configuration
func Do(config *Config, fn func() error, fatalErrors ...error) error {
func Do(ctx context.Context, config *Config, fn func(ctx context.Context) error, fatalErrors ...error) error {
retryCount := getRetryCount(config)
var err error

for retryNo := 0; retryNo <= retryCount; retryNo++ {
err = fn()
if ctx.Err() != nil {
break
}

err = fn(ctx)
if err == nil {
return nil
}
Expand All @@ -46,7 +51,11 @@ func Do(config *Config, fn func() error, fatalErrors ...error) error {
}

if retryNo < retryCount {
<-time.After(getDelayDuration(config, retryNo))
select {
case <-time.After(getDelayDuration(config, retryNo)):
case <-ctx.Done():
break
}
}
}

Expand Down
31 changes: 16 additions & 15 deletions x-pack/elastic-agent/pkg/core/plugin/retry/retrystrategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package retry

import (
"context"
"errors"
"testing"
"time"
Expand All @@ -14,7 +15,7 @@ import (

func TestRetry(t *testing.T) {
type testCase struct {
Fn func() error
Fn func(context.Context) error
ExpectedExecutions int64
IsErrExpected bool
Enabled bool
Expand All @@ -25,19 +26,19 @@ func TestRetry(t *testing.T) {
var executions int64

testCases := map[string]testCase{
"not-failing": testCase{Fn: func() error { executions++; return nil }, ExpectedExecutions: 1, Enabled: true},
"failing": testCase{Fn: func() error { executions++; return errors.New("fail") }, ExpectedExecutions: 4, IsErrExpected: true, Enabled: true},
"fatal-by-enum": testCase{Fn: func() error { executions++; return errFatal }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: true},
"fatal-by-iface": testCase{Fn: func() error { executions++; return ErrFatal{} }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: true},
"not-fatal-by-iface": testCase{Fn: func() error { executions++; return ErrNotFatal{} }, ExpectedExecutions: 4, IsErrExpected: true, Enabled: true},

"dis-not-failing": testCase{Fn: func() error { executions++; return nil }, ExpectedExecutions: 1, Enabled: false},
"dis-failing": testCase{Fn: func() error { executions++; return errors.New("fail") }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},
"dis-fatal-by-enum": testCase{Fn: func() error { executions++; return errFatal }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},
"dis-fatal-by-iface": testCase{Fn: func() error { executions++; return ErrFatal{} }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},
"dis-not-fatal-by-iface": testCase{Fn: func() error { executions++; return ErrNotFatal{} }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},

"failing-exp": testCase{Fn: func() error { executions++; return errors.New("fail") }, ExpectedExecutions: 4, IsErrExpected: true, Enabled: true, Exponential: true},
"not-failing": testCase{Fn: func(_ context.Context) error { executions++; return nil }, ExpectedExecutions: 1, Enabled: true},
"failing": testCase{Fn: func(_ context.Context) error { executions++; return errors.New("fail") }, ExpectedExecutions: 4, IsErrExpected: true, Enabled: true},
"fatal-by-enum": testCase{Fn: func(_ context.Context) error { executions++; return errFatal }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: true},
"fatal-by-iface": testCase{Fn: func(_ context.Context) error { executions++; return ErrFatal{} }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: true},
"not-fatal-by-iface": testCase{Fn: func(_ context.Context) error { executions++; return ErrNotFatal{} }, ExpectedExecutions: 4, IsErrExpected: true, Enabled: true},

"dis-not-failing": testCase{Fn: func(_ context.Context) error { executions++; return nil }, ExpectedExecutions: 1, Enabled: false},
"dis-failing": testCase{Fn: func(_ context.Context) error { executions++; return errors.New("fail") }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},
"dis-fatal-by-enum": testCase{Fn: func(_ context.Context) error { executions++; return errFatal }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},
"dis-fatal-by-iface": testCase{Fn: func(_ context.Context) error { executions++; return ErrFatal{} }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},
"dis-not-fatal-by-iface": testCase{Fn: func(_ context.Context) error { executions++; return ErrNotFatal{} }, ExpectedExecutions: 1, IsErrExpected: true, Enabled: false},

"failing-exp": testCase{Fn: func(_ context.Context) error { executions++; return errors.New("fail") }, ExpectedExecutions: 4, IsErrExpected: true, Enabled: true, Exponential: true},
}

config := &Config{
Expand All @@ -52,7 +53,7 @@ func TestRetry(t *testing.T) {
config.Exponential = tc.Exponential

startTime := time.Now()
err := Do(config, testFn, errFatal)
err := Do(context.Background(), config, testFn, errFatal)

executionTime := time.Since(startTime)
minExecutionTime := getMinExecutionTime(config.Delay, tc.ExpectedExecutions, tc.Exponential)
Expand Down
12 changes: 8 additions & 4 deletions x-pack/elastic-agent/pkg/tokenbucket/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package tokenbucket

import (
"context"
"fmt"
"time"

Expand All @@ -24,12 +25,13 @@ type Bucket struct {
// size: total size of the bucket
// dropAmount: amount which is dropped per every specified interval
// dropRate: specified interval when drop will happen
func NewTokenBucket(size, dropAmount int, dropRate time.Duration) (*Bucket, error) {
func NewTokenBucket(ctx context.Context, size, dropAmount int, dropRate time.Duration) (*Bucket, error) {
s := scheduler.NewPeriodic(dropRate)
return newTokenBucketWithScheduler(size, dropAmount, s)
return newTokenBucketWithScheduler(ctx, size, dropAmount, s)
}

func newTokenBucketWithScheduler(
ctx context.Context,
size, dropAmount int,
s scheduler.Scheduler,
) (*Bucket, error) {
Expand All @@ -47,7 +49,7 @@ func newTokenBucketWithScheduler(
closeChan: make(chan struct{}),
scheduler: s,
}
go b.run()
go b.run(ctx)

return b, nil
}
Expand All @@ -65,7 +67,7 @@ func (b *Bucket) Close() {
}

// run runs basic loop and consumes configured tokens per every configured period.
func (b *Bucket) run() {
func (b *Bucket) run(ctx context.Context) {
for {
select {
case <-b.scheduler.WaitTick():
Expand All @@ -77,6 +79,8 @@ func (b *Bucket) run() {
}
case <-b.closeChan:
return
case <-ctx.Done():
return
}
}
}
5 changes: 5 additions & 0 deletions x-pack/elastic-agent/pkg/tokenbucket/token_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package tokenbucket

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -22,6 +23,7 @@ func TestTokenBucket(t *testing.T) {
stepper := scheduler.NewStepper()

b, err := newTokenBucketWithScheduler(
context.Background(),
bucketSize,
dropAmount,
stepper,
Expand All @@ -37,6 +39,7 @@ func TestTokenBucket(t *testing.T) {
stepper := scheduler.NewStepper()

b, err := newTokenBucketWithScheduler(
context.Background(),
bucketSize,
dropAmount,
stepper,
Expand All @@ -53,6 +56,7 @@ func TestTokenBucket(t *testing.T) {
stepper := scheduler.NewStepper()

b, err := newTokenBucketWithScheduler(
context.Background(),
bucketSize,
dropAmount,
stepper,
Expand Down Expand Up @@ -88,6 +92,7 @@ func TestTokenBucket(t *testing.T) {
t.Run("When we use a timer scheduler we can unblock", func(t *testing.T) {
d := 1 * time.Second
b, err := NewTokenBucket(
context.Background(),
bucketSize,
dropAmount,
d,
Expand Down

0 comments on commit 65b5255

Please sign in to comment.