diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d966b013eac2..f9aeb8911885 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,8 @@ https://github.com/elastic/beats/compare/v6.2.4...6.2[Check the HEAD diff] *Filebeat* +- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797] + *Heartbeat* - Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616] diff --git a/filebeat/prospector/log/harvester.go b/filebeat/prospector/log/harvester.go index 6634f5f9c5ec..2ec292ffb686 100644 --- a/filebeat/prospector/log/harvester.go +++ b/filebeat/prospector/log/harvester.go @@ -52,6 +52,9 @@ var ( ErrClosed = errors.New("reader closed") ) +// OutletFactory provides an outlet for the harvester +type OutletFactory func() channel.Outleter + // Harvester contains all harvester related data type Harvester struct { id uuid.UUID @@ -74,8 +77,8 @@ type Harvester struct { encoding encoding.Encoding // event/state publishing - forwarder *harvester.Forwarder - publishState func(*util.Data) bool + outletFactory OutletFactory + publishState func(*util.Data) bool onTerminate func() } @@ -86,17 +89,18 @@ func NewHarvester( state file.State, states *file.States, publishState func(*util.Data) bool, - outlet channel.Outleter, + outletFactory OutletFactory, ) (*Harvester, error) { h := &Harvester{ - config: defaultConfig, - state: state, - states: states, - publishState: publishState, - done: make(chan struct{}), - stopWg: &sync.WaitGroup{}, - id: uuid.NewV4(), + config: defaultConfig, + state: state, + states: states, + publishState: publishState, + done: make(chan struct{}), + stopWg: &sync.WaitGroup{}, + id: uuid.NewV4(), + outletFactory: outletFactory, } if err := config.Unpack(&h.config); err != nil { @@ -115,8 +119,6 @@ func NewHarvester( } // Add outlet signal so harvester can also stop itself - outlet = channel.CloseOnSignal(outlet, h.done) - h.forwarder = harvester.NewForwarder(outlet) return h, nil } @@ -163,6 +165,10 @@ func (h *Harvester) Run() error { if h.onTerminate != nil { defer h.onTerminate() } + + outlet := channel.CloseOnSignal(h.outletFactory(), h.done) + forwarder := harvester.NewForwarder(outlet) + // This is to make sure a harvester is not started anymore if stop was already // called before the harvester was started. The waitgroup is not incremented afterwards // as otherwise it could happened that between checking for the close channel and incrementing @@ -302,7 +308,7 @@ func (h *Harvester) Run() error { // Always send event to update state, also if lines was skipped // Stop harvester in case of an error - if !h.sendEvent(data) { + if !h.sendEvent(data, forwarder) { return nil } @@ -326,12 +332,12 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent -func (h *Harvester) sendEvent(data *util.Data) bool { +func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) bool { if h.source.HasState() { h.states.Update(data.GetState()) } - err := h.forwarder.Send(data) + err := forwarder.Send(data) return err == nil } diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index c96c5d817471..6e76146badf0 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -572,10 +572,21 @@ func (p *Prospector) isCleanInactive(state file.State) bool { return false } +// subOutletWrap returns a factory method that will wrap the passed outlet +// in a SubOutlet and memoize the result so the wrapping is done only once. +func subOutletWrap(outlet channel.Outleter) func() channel.Outleter { + var subOutlet channel.Outleter + return func() channel.Outleter { + if subOutlet == nil { + subOutlet = channel.SubOutlet(outlet) + } + return subOutlet + } +} + // createHarvester creates a new harvester instance from the given state func (p *Prospector) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { // Each wraps the outlet, for closing the outlet individually - outlet := channel.SubOutlet(p.outlet) h, err := NewHarvester( p.cfg, state, @@ -583,7 +594,7 @@ func (p *Prospector) createHarvester(state file.State, onTerminate func()) (*Har func(d *util.Data) bool { return p.stateOutlet.OnEvent(d) }, - outlet, + subOutletWrap(p.outlet), ) if err == nil { h.onTerminate = onTerminate diff --git a/filebeat/prospector/stdin/prospector.go b/filebeat/prospector/stdin/prospector.go index a1e81ef52bee..6bdcbbe6587c 100644 --- a/filebeat/prospector/stdin/prospector.go +++ b/filebeat/prospector/stdin/prospector.go @@ -73,7 +73,9 @@ func (p *Prospector) createHarvester(state file.State) (*log.Harvester, error) { h, err := log.NewHarvester( p.cfg, state, nil, nil, - p.outlet, + func() channel.Outleter { + return p.outlet + }, ) return h, err