From 43bc4f83984eba3393fe09a652b0e7aeedab2ba4 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 1 Jun 2020 16:42:45 +0300 Subject: [PATCH 01/11] Fix race conditions and config errors Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/pod.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d671cf14a22..de6a3450772 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -134,6 +134,11 @@ func (p *pod) OnAdd(obj interface{}) { func (p *pod) OnUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) + p.logger.Errorf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) + if pod.Status.Phase == kubernetes.PodPending { + p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) + return + } // If Pod is in a phase where all containers in the have terminated emit a stop event if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed { p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) @@ -144,6 +149,7 @@ func (p *pod) OnUpdate(obj interface{}) { p.logger.Debugf("Watcher Pod update: %+v", obj) p.emit(pod, "stop") + time.Sleep(5 * time.Second) p.emit(pod, "start") } From 188e970ed62ee40c99cd54b325a04add82a2af52 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 1 Jun 2020 18:04:07 +0300 Subject: [PATCH 02/11] log level Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/pod.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index de6a3450772..df5db103bd3 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -134,7 +134,7 @@ func (p *pod) OnAdd(obj interface{}) { func (p *pod) OnUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) - p.logger.Errorf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) + p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) if pod.Status.Phase == kubernetes.PodPending { p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) return From 638f96d56a1655f3e8132b39f210dac0ce6bd38f Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 2 Jun 2020 14:57:04 +0300 Subject: [PATCH 03/11] Add delete-anyway logic as well as retry on stop/start Signed-off-by: chrismark --- libbeat/autodiscover/autodiscover.go | 23 ++++++++++++++++++- .../autodiscover/providers/kubernetes/pod.go | 5 ++-- libbeat/common/kubernetes/watcher.go | 9 ++++++-- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 974beb7253a..8ccad922686 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -206,7 +206,28 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { err = a.factory.CheckConfig(config) if err != nil { - a.logger.Error(errors.Wrap(err, fmt.Sprintf("Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true)))) + a.logger.Debug(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) + checkConfigAttempts := 0 + maxCheckConfigAttempts := 5 + for { + err = a.factory.CheckConfig(config) + if err == nil { + a.logger.Debug(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s'[attempt %v], won't start runner", + common.DebugString(config, true), checkConfigAttempts + 1))) + break + } + if checkConfigAttempts > maxCheckConfigAttempts { + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s' after max attempts, won't start runner", + common.DebugString(config, true)))) + break + } + time.Sleep(3 * time.Second) + checkConfigAttempts += 1 + } continue } diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index df5db103bd3..bc04a6a2260 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -124,7 +124,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub // OnAdd ensures processing of service objects that are newly added func (p *pod) OnAdd(obj interface{}) { - p.logger.Debugf("Watcher Node add: %+v", obj) + p.logger.Debugf("Watcher Pod add: %+v", obj) p.emit(obj.(*kubernetes.Pod), "start") } @@ -149,13 +149,12 @@ func (p *pod) OnUpdate(obj interface{}) { p.logger.Debugf("Watcher Pod update: %+v", obj) p.emit(pod, "stop") - time.Sleep(5 * time.Second) p.emit(pod, "start") } // GenerateHints creates hints needed for hints builder func (p *pod) OnDelete(obj interface{}) { - p.logger.Debugf("Watcher Node delete: %+v", obj) + p.logger.Debugf("Watcher Pod delete: %+v", obj) time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) } diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 33cc808358a..299366ca703 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -70,6 +70,7 @@ type WatchOptions struct { type item struct { object interface{} + objectRaw interface{} state string } @@ -175,8 +176,7 @@ func (w *watcher) enqueue(obj interface{}, state string) { if err != nil { return } - - w.queue.Add(&item{key, state}) + w.queue.Add(&item{key, obj, state}) } // process gets the top of the work queue and processes the object that is received. @@ -204,6 +204,11 @@ func (w *watcher) process(ctx context.Context) bool { return nil } if !exists { + if entry.state == delete { + w.logger.Errorf("Object was not found in the store, deleting anyway!") + // delete anyway in order to clean states + w.handler.OnDelete(entry.objectRaw) + } return nil } From 078c2797c94e2129c429831f875f68d128eaec05 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 3 Jun 2020 12:37:19 +0300 Subject: [PATCH 04/11] Add specific check for InputNotFinished errors Signed-off-by: chrismark --- filebeat/input/file/state.go | 17 ++++++ filebeat/input/log/input.go | 5 +- libbeat/autodiscover/autodiscover.go | 56 +++++++++++-------- .../autodiscover/providers/kubernetes/pod.go | 5 +- libbeat/common/errors.go | 15 +++++ libbeat/common/kubernetes/watcher.go | 4 +- 6 files changed, 73 insertions(+), 29 deletions(-) create mode 100644 libbeat/common/errors.go diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index dde3c6c5421..9ce053cdf27 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -18,6 +18,7 @@ package file import ( + "fmt" "os" "strconv" "strings" @@ -97,3 +98,19 @@ func (s *State) IsEmpty() bool { len(s.Meta) == 0 && s.Timestamp.IsZero() } + +// String returns string representation of the struct +func (s *State) String() string { + return fmt.Sprintf( + "{Id: %v, Finished: %v, Fileinfo: %v, Source: %v, Offset: %v, Timestamp: %v, TTL: %v, Type: %v, Meta: %v, FileStateOS: %v}", + s.Id, + s.Finished, + s.Fileinfo, + s.Source, + s.Offset, + s.Timestamp, + s.TTL, + s.Type, + s.Meta, + s.FileStateOS) +} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 1c59e5b73d3..52cc41f9e78 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -18,7 +18,6 @@ package log import ( - "errors" "fmt" "os" "path/filepath" @@ -27,6 +26,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" @@ -172,7 +173,7 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) + return &common.ErrInputNotFinished{State: state.String()} } // Update input states and send new states to registry diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 8ccad922686..51d0e798f42 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -204,30 +204,8 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.factory.CheckConfig(config) + err = a.checkConfig(config) if err != nil { - a.logger.Debug(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s', won't start runner", - common.DebugString(config, true)))) - checkConfigAttempts := 0 - maxCheckConfigAttempts := 5 - for { - err = a.factory.CheckConfig(config) - if err == nil { - a.logger.Debug(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s'[attempt %v], won't start runner", - common.DebugString(config, true), checkConfigAttempts + 1))) - break - } - if checkConfigAttempts > maxCheckConfigAttempts { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s' after max attempts, won't start runner", - common.DebugString(config, true)))) - break - } - time.Sleep(3 * time.Second) - checkConfigAttempts += 1 - } continue } @@ -249,6 +227,38 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { return updated } +// checkConfig verifies an input config by calling CheckConfig of various factories with a retry in order to +// ensure that a config is not in conflict with an older config which is "stopping" state yet. +func (a *Autodiscover) checkConfig(config *common.Config) error { + checkConfigAttempts := 0 + maxCheckConfigAttempts := 5 + for { + err := a.factory.CheckConfig(config) + if err == nil { + return nil + } else { + if err, ok := err.(*common.ErrInputNotFinished); !ok { + // error not related to stopping input, raise it now + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s' after max attempts, won't start runner", + common.DebugString(config, true)))) + return err + } + a.logger.Debug(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s'[attempt %v], won't start runner", + common.DebugString(config, true), checkConfigAttempts+1))) + } + if checkConfigAttempts > maxCheckConfigAttempts { + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s' after max attempts, won't start runner", + common.DebugString(config, true)))) + return err + } + time.Sleep(3 * time.Second) + checkConfigAttempts += 1 + } +} + func (a *Autodiscover) handleStop(event bus.Event) bool { var updated bool diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index bc04a6a2260..77ea7bb9ec2 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -122,7 +122,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub return p, nil } -// OnAdd ensures processing of service objects that are newly added +// OnAdd ensures processing of pod objects that are newly added func (p *pod) OnAdd(obj interface{}) { p.logger.Debugf("Watcher Pod add: %+v", obj) p.emit(obj.(*kubernetes.Pod), "start") @@ -152,12 +152,13 @@ func (p *pod) OnUpdate(obj interface{}) { p.emit(pod, "start") } -// GenerateHints creates hints needed for hints builder +// OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { p.logger.Debugf("Watcher Pod delete: %+v", obj) time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) } +// GenerateHints creates hints needed for hints builder func (p *pod) GenerateHints(event bus.Event) bus.Event { // Try to build a config with enabled builders. Send a provider agnostic payload. // Builders are Beat specific. diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go new file mode 100644 index 00000000000..ef92eda520c --- /dev/null +++ b/libbeat/common/errors.go @@ -0,0 +1,15 @@ +package common + +import ( + "fmt" +) + +// ErrInputNotFinished struct for reporting errors related to not finished inputs +type ErrInputNotFinished struct { + state string + } + +// Error method of ErrInputNotFinished +func (e *ErrInputNotFinished) Error() string { + return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.state) +} diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 299366ca703..4875823ddc1 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -69,9 +69,9 @@ type WatchOptions struct { } type item struct { - object interface{} + object interface{} objectRaw interface{} - state string + state string } type watcher struct { From bf14d18d57a1e76c2cf11f75a835301eec53389a Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 3 Jun 2020 16:32:36 +0300 Subject: [PATCH 05/11] Add License Signed-off-by: chrismark --- libbeat/common/errors.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go index ef92eda520c..257599ee758 100644 --- a/libbeat/common/errors.go +++ b/libbeat/common/errors.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package common import ( @@ -6,8 +23,8 @@ import ( // ErrInputNotFinished struct for reporting errors related to not finished inputs type ErrInputNotFinished struct { - state string - } + state string +} // Error method of ErrInputNotFinished func (e *ErrInputNotFinished) Error() string { From ad14e501aa1f36c7a7f263cb08aa02f28a6d6b2a Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 3 Jun 2020 16:44:35 +0300 Subject: [PATCH 06/11] Fix state Signed-off-by: chrismark --- libbeat/common/errors.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go index 257599ee758..68fecb8f550 100644 --- a/libbeat/common/errors.go +++ b/libbeat/common/errors.go @@ -23,10 +23,10 @@ import ( // ErrInputNotFinished struct for reporting errors related to not finished inputs type ErrInputNotFinished struct { - state string + State string } // Error method of ErrInputNotFinished func (e *ErrInputNotFinished) Error() string { - return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.state) + return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.State) } From d8c70f921618817bc1e00cad73b7a7949c606b7c Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 15 Jun 2020 10:37:09 +0300 Subject: [PATCH 07/11] Remove input related stuff Signed-off-by: chrismark --- filebeat/input/file/state.go | 17 ------------- filebeat/input/log/input.go | 4 +-- libbeat/autodiscover/autodiscover.go | 38 +++------------------------- libbeat/common/errors.go | 32 ----------------------- 4 files changed, 6 insertions(+), 85 deletions(-) delete mode 100644 libbeat/common/errors.go diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 9ce053cdf27..dde3c6c5421 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -18,7 +18,6 @@ package file import ( - "fmt" "os" "strconv" "strings" @@ -98,19 +97,3 @@ func (s *State) IsEmpty() bool { len(s.Meta) == 0 && s.Timestamp.IsZero() } - -// String returns string representation of the struct -func (s *State) String() string { - return fmt.Sprintf( - "{Id: %v, Finished: %v, Fileinfo: %v, Source: %v, Offset: %v, Timestamp: %v, TTL: %v, Type: %v, Meta: %v, FileStateOS: %v}", - s.Id, - s.Finished, - s.Fileinfo, - s.Source, - s.Offset, - s.Timestamp, - s.TTL, - s.Type, - s.Meta, - s.FileStateOS) -} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 52cc41f9e78..f22ecddd5ae 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -18,6 +18,7 @@ package log import ( + "errors" "fmt" "os" "path/filepath" @@ -26,7 +27,6 @@ import ( "sync" "time" - "github.com/pkg/errors" "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/harvester" @@ -173,7 +173,7 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return &common.ErrInputNotFinished{State: state.String()} + return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) } // Update input states and send new states to registry diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 51d0e798f42..aa2f529fdaa 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -204,9 +204,11 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.checkConfig(config) + err = a.factory.CheckConfig(config) if err != nil { - continue + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) } // Update meta no matter what @@ -227,38 +229,6 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { return updated } -// checkConfig verifies an input config by calling CheckConfig of various factories with a retry in order to -// ensure that a config is not in conflict with an older config which is "stopping" state yet. -func (a *Autodiscover) checkConfig(config *common.Config) error { - checkConfigAttempts := 0 - maxCheckConfigAttempts := 5 - for { - err := a.factory.CheckConfig(config) - if err == nil { - return nil - } else { - if err, ok := err.(*common.ErrInputNotFinished); !ok { - // error not related to stopping input, raise it now - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s' after max attempts, won't start runner", - common.DebugString(config, true)))) - return err - } - a.logger.Debug(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s'[attempt %v], won't start runner", - common.DebugString(config, true), checkConfigAttempts+1))) - } - if checkConfigAttempts > maxCheckConfigAttempts { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s' after max attempts, won't start runner", - common.DebugString(config, true)))) - return err - } - time.Sleep(3 * time.Second) - checkConfigAttempts += 1 - } -} - func (a *Autodiscover) handleStop(event bus.Event) bool { var updated bool diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go deleted file mode 100644 index 68fecb8f550..00000000000 --- a/libbeat/common/errors.go +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package common - -import ( - "fmt" -) - -// ErrInputNotFinished struct for reporting errors related to not finished inputs -type ErrInputNotFinished struct { - State string -} - -// Error method of ErrInputNotFinished -func (e *ErrInputNotFinished) Error() string { - return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.State) -} From b6681ee3166a1d43888405c5d5f2b1d35619f90a Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 15 Jun 2020 10:41:18 +0300 Subject: [PATCH 08/11] changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/input.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5ccd4b7d8ff..596afe59a61 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -114,6 +114,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix an issue where error messages are not accurate in mapstriface. {issue}18662[18662] {pull}18663[18663] - Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818] - Fix potential race condition in fingerprint processor. {pull}18738[18738] +- Add better handling for Kubernetes Update and Delete watcher events. {pull}18882[18882] *Auditbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index f22ecddd5ae..1c59e5b73d3 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -27,7 +27,6 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" From 2c56f4ba80ee0a50ec643c3618fbfa416168214a Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 10:34:58 +0300 Subject: [PATCH 09/11] Review changes Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/pod.go | 7 +++---- libbeat/common/kubernetes/watcher.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 77ea7bb9ec2..2b48e405927 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -135,16 +135,15 @@ func (p *pod) OnUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - if pod.Status.Phase == kubernetes.PodPending { - p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) - return - } // If Pod is in a phase where all containers in the have terminated emit a stop event if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed { p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) return + } else if pod.Status.Phase == kubernetes.PodPending { + p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) + return } p.logger.Debugf("Watcher Pod update: %+v", obj) diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 4875823ddc1..606a36ac109 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -205,7 +205,7 @@ func (w *watcher) process(ctx context.Context) bool { } if !exists { if entry.state == delete { - w.logger.Errorf("Object was not found in the store, deleting anyway!") + w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) // delete anyway in order to clean states w.handler.OnDelete(entry.objectRaw) } From 943825f8de06a58d817314eaa779e5cc9f7b4a6e Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 10:38:30 +0300 Subject: [PATCH 10/11] Bring back removed line Signed-off-by: chrismark --- libbeat/autodiscover/autodiscover.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index aa2f529fdaa..e26a2521c16 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -209,6 +209,7 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { a.logger.Error(errors.Wrap(err, fmt.Sprintf( "Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true)))) + continue } // Update meta no matter what From 9c469450c9f1462fe9ea7637fe9542397b7ce4ec Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 14:22:57 +0300 Subject: [PATCH 11/11] replace if/else with switch Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/pod.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 2b48e405927..c4260dab1d5 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -135,13 +135,13 @@ func (p *pod) OnUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - // If Pod is in a phase where all containers in the have terminated emit a stop event - if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed { + switch pod.Status.Phase { + case kubernetes.PodSucceeded, kubernetes.PodFailed: + // If Pod is in a phase where all containers in the have terminated emit a stop event p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) return - } else if pod.Status.Phase == kubernetes.PodPending { + case kubernetes.PodPending: p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) return }