From 888968b292a71ebc56fec96ddd96fc95cc4b6c45 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 12:48:40 +0300 Subject: [PATCH 1/7] Add retry logic for not_finished state errors Signed-off-by: chrismark --- filebeat/input/file/state.go | 17 +++++++++ filebeat/input/log/input.go | 2 +- libbeat/autodiscover/autodiscover.go | 37 +++++++++++++++++-- .../autodiscover/providers/kubernetes/pod.go | 1 + libbeat/common/errors.go | 32 ++++++++++++++++ 5 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 libbeat/common/errors.go diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 18fcfee583b..560daf8a7bc 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -18,6 +18,7 @@ package file import ( + "fmt" "os" "time" @@ -66,3 +67,19 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin func (s *State) IsEqual(c *State) bool { return s.Id == c.Id } + +// String returns string representation of the struct +func (s *State) String() string { + return fmt.Sprintf( + "{Id: %v, Finished: %v, Fileinfo: %v, Source: %v, Offset: %v, Timestamp: %v, TTL: %v, Type: %v, Meta: %v, FileStateOS: %v}", + s.Id, + s.Finished, + s.Fileinfo, + s.Source, + s.Offset, + s.Timestamp, + s.TTL, + s.Type, + s.Meta, + s.FileStateOS) +} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index d7bd4f18017..365da416ed3 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -175,7 +175,7 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) + return &common.ErrInputNotFinished{State: state.String()} } // Convert state to current identifier if different diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index e26a2521c16..42bed27e91f 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -204,11 +204,8 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.factory.CheckConfig(config) + err = a.checkConfig(config) if err != nil { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s', won't start runner", - common.DebugString(config, true)))) continue } @@ -298,3 +295,35 @@ func (a *Autodiscover) Stop() { a.runners.Stop() a.logger.Info("Stopped autodiscover manager") } + +// checkConfig verifies an input config by calling CheckConfig of various factories with a retry in order to +// ensure that a config is not in conflict with an older config which is still in "stopping" state. +func (a *Autodiscover) checkConfig(config *common.Config) error { + checkConfigAttempts := 0 + maxCheckConfigAttempts := 5 + for { + err := a.factory.CheckConfig(config) + if err == nil { + return nil + } else { + if err, ok := err.(*common.ErrInputNotFinished); !ok { + // error not related to stopping input, raise it now + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s' after max attempts, won't start runner", + common.DebugString(config, true)))) + return err + } + a.logger.Debug(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s'[attempt %v], won't start runner", + common.DebugString(config, true), checkConfigAttempts+1))) + } + if checkConfigAttempts > maxCheckConfigAttempts { + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s' after max attempts, won't start runner", + common.DebugString(config, true)))) + return err + } + time.Sleep(3 * time.Second) + checkConfigAttempts += 1 + } +} diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 39df134b809..033146a84d4 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -160,6 +160,7 @@ func (p *pod) OnUpdate(obj interface{}) { } } time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) + return } p.logger.Debugf("Watcher Pod update: %+v", obj) diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go new file mode 100644 index 00000000000..68fecb8f550 --- /dev/null +++ b/libbeat/common/errors.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "fmt" +) + +// ErrInputNotFinished struct for reporting errors related to not finished inputs +type ErrInputNotFinished struct { + State string +} + +// Error method of ErrInputNotFinished +func (e *ErrInputNotFinished) Error() string { + return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.State) +} From 9c9e630451554d60c624cfcca77f7c16b50b6870 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 13:18:21 +0300 Subject: [PATCH 2/7] small fix Signed-off-by: chrismark --- libbeat/autodiscover/autodiscover.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 42bed27e91f..032efa172aa 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -306,24 +306,26 @@ func (a *Autodiscover) checkConfig(config *common.Config) error { if err == nil { return nil } else { - if err, ok := err.(*common.ErrInputNotFinished); !ok { + if _, ok := err.(*common.ErrInputNotFinished); !ok { // error not related to stopping input, raise it now + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) + return err + } + + if checkConfigAttempts > maxCheckConfigAttempts { a.logger.Error(errors.Wrap(err, fmt.Sprintf( "Auto discover config check failed for config '%s' after max attempts, won't start runner", common.DebugString(config, true)))) return err + } else { + a.logger.Debug(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s'[attempt %v], won't start runner", + common.DebugString(config, true), checkConfigAttempts+1))) + time.Sleep(3 * time.Second) + checkConfigAttempts += 1 } - a.logger.Debug(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s'[attempt %v], won't start runner", - common.DebugString(config, true), checkConfigAttempts+1))) - } - if checkConfigAttempts > maxCheckConfigAttempts { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s' after max attempts, won't start runner", - common.DebugString(config, true)))) - return err } - time.Sleep(3 * time.Second) - checkConfigAttempts += 1 } } From 52b5bd4836d2376734e3320a022a9c29f4bd8346 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 14:15:25 +0300 Subject: [PATCH 3/7] Remove retry logic and ignore ErrInputNotFinished errors Signed-off-by: chrismark --- filebeat/input/runnerfactory.go | 4 +++ libbeat/autodiscover/autodiscover.go | 39 +++------------------------- 2 files changed, 8 insertions(+), 35 deletions(-) diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index 179057c4373..8790f8019fd 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -59,5 +59,9 @@ func (r *RunnerFactory) Create( func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { _, err := r.Create(pipeline.NewNilPipeline(), cfg) + if _, ok := err.(*common.ErrInputNotFinished); ok { + // error is related state + return nil + } return err } diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 032efa172aa..e26a2521c16 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -204,8 +204,11 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.checkConfig(config) + err = a.factory.CheckConfig(config) if err != nil { + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) continue } @@ -295,37 +298,3 @@ func (a *Autodiscover) Stop() { a.runners.Stop() a.logger.Info("Stopped autodiscover manager") } - -// checkConfig verifies an input config by calling CheckConfig of various factories with a retry in order to -// ensure that a config is not in conflict with an older config which is still in "stopping" state. -func (a *Autodiscover) checkConfig(config *common.Config) error { - checkConfigAttempts := 0 - maxCheckConfigAttempts := 5 - for { - err := a.factory.CheckConfig(config) - if err == nil { - return nil - } else { - if _, ok := err.(*common.ErrInputNotFinished); !ok { - // error not related to stopping input, raise it now - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s', won't start runner", - common.DebugString(config, true)))) - return err - } - - if checkConfigAttempts > maxCheckConfigAttempts { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s' after max attempts, won't start runner", - common.DebugString(config, true)))) - return err - } else { - a.logger.Debug(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s'[attempt %v], won't start runner", - common.DebugString(config, true), checkConfigAttempts+1))) - time.Sleep(3 * time.Second) - checkConfigAttempts += 1 - } - } - } -} From 17393cb007091839fd3ab7d9bc6dddd1a5505f6d Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 14:22:38 +0300 Subject: [PATCH 4/7] changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d8a51a18081..be566c9bad3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -151,6 +151,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix terminating pod autodiscover issue. {pull}20084[20084] - Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054] - Output errors when Kibana index pattern setup fails. {pull}20121[20121] +- [Autodiscovery] Ignore ErrInputNotFinished errors in autodiscover config checks. {pull}20305[20305] *Auditbeat* From caf9e7e8c7f2bf09596a4ddb75e4ca0c1724fdc4 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 15:04:19 +0300 Subject: [PATCH 5/7] fix changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index be566c9bad3..f29ce3173d2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -151,7 +151,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix terminating pod autodiscover issue. {pull}20084[20084] - Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054] - Output errors when Kibana index pattern setup fails. {pull}20121[20121] -- [Autodiscovery] Ignore ErrInputNotFinished errors in autodiscover config checks. {pull}20305[20305] +- Fix issue in autodiscover that kept inputs stopped after config updates. {pull}20305[20305] *Auditbeat* From 265b922631cea69540fa055462ecac4126990bd2 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 15:50:15 +0300 Subject: [PATCH 6/7] fix comment Signed-off-by: chrismark --- filebeat/input/runnerfactory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index 8790f8019fd..f4973e47948 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -60,7 +60,7 @@ func (r *RunnerFactory) Create( func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { _, err := r.Create(pipeline.NewNilPipeline(), cfg) if _, ok := err.(*common.ErrInputNotFinished); ok { - // error is related state + // error is related to state, and hence config can be considered valid return nil } return err From ec3d0147eb1321354fd2e775a51d7d63e717243b Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 29 Jul 2020 17:31:45 +0300 Subject: [PATCH 7/7] Move location of errors.go from libeat to filebeat Signed-off-by: chrismark --- {libbeat/common => filebeat/input}/errors.go | 2 +- filebeat/input/log/input.go | 2 +- filebeat/input/runnerfactory.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename {libbeat/common => filebeat/input}/errors.go (98%) diff --git a/libbeat/common/errors.go b/filebeat/input/errors.go similarity index 98% rename from libbeat/common/errors.go rename to filebeat/input/errors.go index 68fecb8f550..098156abf91 100644 --- a/libbeat/common/errors.go +++ b/filebeat/input/errors.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package common +package input import ( "fmt" diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 365da416ed3..a1837bbd471 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -175,7 +175,7 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return &common.ErrInputNotFinished{State: state.String()} + return &input.ErrInputNotFinished{State: state.String()} } // Convert state to current identifier if different diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index f4973e47948..afd67d4e08a 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -59,7 +59,7 @@ func (r *RunnerFactory) Create( func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { _, err := r.Create(pipeline.NewNilPipeline(), cfg) - if _, ok := err.(*common.ErrInputNotFinished); ok { + if _, ok := err.(*ErrInputNotFinished); ok { // error is related to state, and hence config can be considered valid return nil }