Skip to content

Commit

Permalink
Move worker start before provider start as queue is bounded (elastic#…
Browse files Browse the repository at this point in the history
…7926)

(cherry picked from commit f066f4f)
  • Loading branch information
vjsamuel authored and Carlos Pérez-Aradros Herce committed Nov 29, 2018
1 parent 277a767 commit 2144904
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ https://github.com/elastic/beats/compare/v6.4.0...v6.5.0[View commits]
- Fix bug in loading dashboards from zip file. {issue}8051[8051]
- Fix in-cluster kubernetes configuration on IPv6. {pull}8754[8754]
- The export config subcommand should not display real value for field reference. {pull}8769[8769]
- Do not panic when no tokenizer string is configured for a dissect processor. {issue}8895[8895]
- Start autodiscover consumers before producers. {pull}7926[7926]
- The setup command will not fail if no dashboard is available to import. {pull}8977[8977]
- Fix central management configurations reload when a configuration is removed in Kibana. {issue}9010[9010]
Expand Down
9 changes: 7 additions & 2 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,16 @@ func (a *Autodiscover) Start() {
logp.Info("Starting autodiscover manager")
a.listener = a.bus.Subscribe(a.adapter.EventFilter()...)

// It is important to start the worker first before starting the producer.
// In hosts that have large number of workloads, it is easy to have an initial
// sync of workloads to have a count that is greater than 100 (which is the size
// of the bounded Go channel. Starting the providers before the consumer would
// result in the channel filling up and never allowing the worker to start up.
go a.worker()

for _, provider := range a.providers {
provider.Start()
}

go a.worker()
}

func (a *Autodiscover) worker() {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type watcher struct {
k8sResourceFactory func() k8s.Resource
items func() []k8s.Resource
handler ResourceEventHandler
logger *logp.Logger
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -82,6 +83,7 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa
lastResourceVersion: "0",
ctx: ctx,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
}
switch resource.(type) {
// add resource type which you want to support watching here
Expand Down Expand Up @@ -184,10 +186,12 @@ func (w *watcher) sync() error {
return err
}

w.logger.Debugf("Got %v items from the resource sync", len(w.items()))
for _, item := range w.items() {
w.onAdd(item)
}

w.logger.Debugf("Done syncing %v items from the resource sync", len(w.items()))
// Store last version
w.lastResourceVersion = w.resourceList.GetMetadata().GetResourceVersion()

Expand Down

0 comments on commit 2144904

Please sign in to comment.