From c775b3cdaa090f377edc2deab59e0abe6e704afd Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 11 Apr 2018 16:56:07 +0200 Subject: [PATCH] Filebeat: Fix leak in log harvester (#6797) This patch reorganizes a little bit how the log harvester works, so that suboutlets are only created when the harvester is ready to use them (inside Run()), instead of being passed during constructor. This prevents a memory leak caused by some internal goroutines not stopping if the harvester Setup() fails, for example when files cannot be read. Fixes #6797 --- CHANGELOG.asciidoc | 2 ++ filebeat/prospector/log/harvester.go | 36 ++++++++++++++----------- filebeat/prospector/log/prospector.go | 15 +++++++++-- filebeat/prospector/stdin/prospector.go | 4 ++- 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d966b013eac2..f9aeb8911885 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,8 @@ https://github.com/elastic/beats/compare/v6.2.4...6.2[Check the HEAD diff] *Filebeat* +- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797] + *Heartbeat* - Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616] diff --git a/filebeat/prospector/log/harvester.go b/filebeat/prospector/log/harvester.go index 6634f5f9c5ec..2ec292ffb686 100644 --- a/filebeat/prospector/log/harvester.go +++ b/filebeat/prospector/log/harvester.go @@ -52,6 +52,9 @@ var ( ErrClosed = errors.New("reader closed") ) +// OutletFactory provides an outlet for the harvester +type OutletFactory func() channel.Outleter + // Harvester contains all harvester related data type Harvester struct { id uuid.UUID @@ -74,8 +77,8 @@ type Harvester struct { encoding encoding.Encoding // event/state publishing - forwarder *harvester.Forwarder - publishState func(*util.Data) bool + outletFactory OutletFactory + publishState func(*util.Data) bool onTerminate func() } @@ -86,17 +89,18 @@ func NewHarvester( state file.State, states *file.States, publishState func(*util.Data) bool, - outlet channel.Outleter, + outletFactory OutletFactory, ) (*Harvester, error) { h := &Harvester{ - config: defaultConfig, - state: state, - states: states, - publishState: publishState, - done: make(chan struct{}), - stopWg: &sync.WaitGroup{}, - id: uuid.NewV4(), + config: defaultConfig, + state: state, + states: states, + publishState: publishState, + done: make(chan struct{}), + stopWg: &sync.WaitGroup{}, + id: uuid.NewV4(), + outletFactory: outletFactory, } if err := config.Unpack(&h.config); err != nil { @@ -115,8 +119,6 @@ func NewHarvester( } // Add outlet signal so harvester can also stop itself - outlet = channel.CloseOnSignal(outlet, h.done) - h.forwarder = harvester.NewForwarder(outlet) return h, nil } @@ -163,6 +165,10 @@ func (h *Harvester) Run() error { if h.onTerminate != nil { defer h.onTerminate() } + + outlet := channel.CloseOnSignal(h.outletFactory(), h.done) + forwarder := harvester.NewForwarder(outlet) + // This is to make sure a harvester is not started anymore if stop was already // called before the harvester was started. The waitgroup is not incremented afterwards // as otherwise it could happened that between checking for the close channel and incrementing @@ -302,7 +308,7 @@ func (h *Harvester) Run() error { // Always send event to update state, also if lines was skipped // Stop harvester in case of an error - if !h.sendEvent(data) { + if !h.sendEvent(data, forwarder) { return nil } @@ -326,12 +332,12 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent -func (h *Harvester) sendEvent(data *util.Data) bool { +func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) bool { if h.source.HasState() { h.states.Update(data.GetState()) } - err := h.forwarder.Send(data) + err := forwarder.Send(data) return err == nil } diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index c96c5d817471..6e76146badf0 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -572,10 +572,21 @@ func (p *Prospector) isCleanInactive(state file.State) bool { return false } +// subOutletWrap returns a factory method that will wrap the passed outlet +// in a SubOutlet and memoize the result so the wrapping is done only once. +func subOutletWrap(outlet channel.Outleter) func() channel.Outleter { + var subOutlet channel.Outleter + return func() channel.Outleter { + if subOutlet == nil { + subOutlet = channel.SubOutlet(outlet) + } + return subOutlet + } +} + // createHarvester creates a new harvester instance from the given state func (p *Prospector) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { // Each wraps the outlet, for closing the outlet individually - outlet := channel.SubOutlet(p.outlet) h, err := NewHarvester( p.cfg, state, @@ -583,7 +594,7 @@ func (p *Prospector) createHarvester(state file.State, onTerminate func()) (*Har func(d *util.Data) bool { return p.stateOutlet.OnEvent(d) }, - outlet, + subOutletWrap(p.outlet), ) if err == nil { h.onTerminate = onTerminate diff --git a/filebeat/prospector/stdin/prospector.go b/filebeat/prospector/stdin/prospector.go index a1e81ef52bee..6bdcbbe6587c 100644 --- a/filebeat/prospector/stdin/prospector.go +++ b/filebeat/prospector/stdin/prospector.go @@ -73,7 +73,9 @@ func (p *Prospector) createHarvester(state file.State) (*log.Harvester, error) { h, err := log.NewHarvester( p.cfg, state, nil, nil, - p.outlet, + func() channel.Outleter { + return p.outlet + }, ) return h, err