Skip to content

Commit

Permalink
Move worker start before provider start as queue is bounded (elastic#…
Browse files Browse the repository at this point in the history
…7926) (elastic#11376)

(cherry picked from commit d0a1e24)
  • Loading branch information
jsoriano authored Mar 22, 2019
1 parent 9ef2924 commit ffd36fc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.5.4...6.5[Check the HEAD diff]
*Affecting all Beats*

- Enforce validation for the Central Management access token. {issue}9621[9621]
- Start autodiscover consumers before producers. {pull}7926[7926]

*Auditbeat*

Expand Down
9 changes: 7 additions & 2 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,16 @@ func (a *Autodiscover) Start() {
logp.Info("Starting autodiscover manager")
a.listener = a.bus.Subscribe(a.adapter.EventFilter()...)

// It is important to start the worker first before starting the producer.
// In hosts that have large number of workloads, it is easy to have an initial
// sync of workloads to have a count that is greater than 100 (which is the size
// of the bounded Go channel. Starting the providers before the consumer would
// result in the channel filling up and never allowing the worker to start up.
go a.worker()

for _, provider := range a.providers {
provider.Start()
}

go a.worker()
}

func (a *Autodiscover) worker() {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type watcher struct {
k8sResourceFactory func() k8s.Resource
items func() []k8s.Resource
handler ResourceEventHandler
logger *logp.Logger
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -82,6 +83,7 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa
lastResourceVersion: "0",
ctx: ctx,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
}
switch resource.(type) {
// add resource type which you want to support watching here
Expand Down Expand Up @@ -184,10 +186,12 @@ func (w *watcher) sync() error {
return err
}

w.logger.Debugf("Got %v items from the resource sync", len(w.items()))
for _, item := range w.items() {
w.onAdd(item)
}

w.logger.Debugf("Done syncing %v items from the resource sync", len(w.items()))
// Store last version
w.lastResourceVersion = w.resourceList.GetMetadata().GetResourceVersion()

Expand Down

0 comments on commit ffd36fc

Please sign in to comment.