From ffd36fc62f3eb5790b693840642388664faf845d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 22 Mar 2019 11:17:08 +0100 Subject: [PATCH] Move worker start before provider start as queue is bounded (#7926) (#11376) (cherry picked from commit d0a1e242fec0237a097147d4102e8c4fbb55991c) --- CHANGELOG.asciidoc | 1 + libbeat/autodiscover/autodiscover.go | 9 +++++++-- libbeat/common/kubernetes/watcher.go | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1868cadc85f9..c69ba3800725 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.5.4...6.5[Check the HEAD diff] *Affecting all Beats* - Enforce validation for the Central Management access token. {issue}9621[9621] +- Start autodiscover consumers before producers. {pull}7926[7926] *Auditbeat* diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 75b545ee4613..69d57f0eedc2 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -103,11 +103,16 @@ func (a *Autodiscover) Start() { logp.Info("Starting autodiscover manager") a.listener = a.bus.Subscribe(a.adapter.EventFilter()...) + // It is important to start the worker first before starting the producer. + // In hosts that have large number of workloads, it is easy to have an initial + // sync of workloads to have a count that is greater than 100 (which is the size + // of the bounded Go channel. Starting the providers before the consumer would + // result in the channel filling up and never allowing the worker to start up. + go a.worker() + for _, provider := range a.providers { provider.Start() } - - go a.worker() } func (a *Autodiscover) worker() { diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 27b1bd338c8e..6bcad2f226fc 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -70,6 +70,7 @@ type watcher struct { k8sResourceFactory func() k8s.Resource items func() []k8s.Resource handler ResourceEventHandler + logger *logp.Logger } // NewWatcher initializes the watcher client to provide a events handler for @@ -82,6 +83,7 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa lastResourceVersion: "0", ctx: ctx, stop: cancel, + logger: logp.NewLogger("kubernetes"), } switch resource.(type) { // add resource type which you want to support watching here @@ -184,10 +186,12 @@ func (w *watcher) sync() error { return err } + w.logger.Debugf("Got %v items from the resource sync", len(w.items())) for _, item := range w.items() { w.onAdd(item) } + w.logger.Debugf("Done syncing %v items from the resource sync", len(w.items())) // Store last version w.lastResourceVersion = w.resourceList.GetMetadata().GetResourceVersion()