From ff1855ece4bda8e3314e9c7a6290264ab43cc8d7 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Thu, 9 Aug 2018 15:22:00 -0700 Subject: [PATCH] Move worker start before provider start as queue is bounded --- CHANGELOG.asciidoc | 1 + libbeat/autodiscover/autodiscover.go | 9 +++++++-- libbeat/common/kubernetes/watcher.go | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4927047d3240..48c5abfa6116 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -64,6 +64,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - The export config subcommand should not display real value for field reference. {pull}xxx[xxx] - The export config subcommand should not display real value for field reference. {pull}8769[8769] - Do not panic when no tokenizer string is configured for a dissect processor. {issue}8895[8895] +- Start autodiscover consumers before producers. {pull}7926[7926] *Auditbeat* diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 5a0e800cbc0e..695f02567d90 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -104,11 +104,16 @@ func (a *Autodiscover) Start() { logp.Info("Starting autodiscover manager") a.listener = a.bus.Subscribe(a.adapter.EventFilter()...) + // It is important to start the worker first before starting the producer. + // In hosts that have large number of workloads, it is easy to have an initial + // sync of workloads to have a count that is greater than 100 (which is the size + // of the bounded Go channel. Starting the providers before the consumer would + // result in the channel filling up and never allowing the worker to start up. + go a.worker() + for _, provider := range a.providers { provider.Start() } - - go a.worker() } func (a *Autodiscover) worker() { diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 27b1bd338c8e..6bcad2f226fc 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -70,6 +70,7 @@ type watcher struct { k8sResourceFactory func() k8s.Resource items func() []k8s.Resource handler ResourceEventHandler + logger *logp.Logger } // NewWatcher initializes the watcher client to provide a events handler for @@ -82,6 +83,7 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa lastResourceVersion: "0", ctx: ctx, stop: cancel, + logger: logp.NewLogger("kubernetes"), } switch resource.(type) { // add resource type which you want to support watching here @@ -184,10 +186,12 @@ func (w *watcher) sync() error { return err } + w.logger.Debugf("Got %v items from the resource sync", len(w.items())) for _, item := range w.items() { w.onAdd(item) } + w.logger.Debugf("Done syncing %v items from the resource sync", len(w.items())) // Store last version w.lastResourceVersion = w.resourceList.GetMetadata().GetResourceVersion()