diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 443595e0ba0b..538e3e359d5b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Filebeat* - Fix panic when log prospector configuration fails to load. {issue}6800[6800] +- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 48a48c16cb37..af813d43011c 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -52,6 +52,9 @@ var ( ErrClosed = errors.New("reader closed") ) +// OutletFactory provides an outlet for the harvester +type OutletFactory func() channel.Outleter + // Harvester contains all harvester related data type Harvester struct { id uuid.UUID @@ -75,8 +78,8 @@ type Harvester struct { encoding encoding.Encoding // event/state publishing - forwarder *harvester.Forwarder - publishState func(*util.Data) bool + outletFactory OutletFactory + publishState func(*util.Data) bool onTerminate func() } @@ -87,17 +90,18 @@ func NewHarvester( state file.State, states *file.States, publishState func(*util.Data) bool, - outlet channel.Outleter, + outletFactory OutletFactory, ) (*Harvester, error) { h := &Harvester{ - config: defaultConfig, - state: state, - states: states, - publishState: publishState, - done: make(chan struct{}), - stopWg: &sync.WaitGroup{}, - id: uuid.NewV4(), + config: defaultConfig, + state: state, + states: states, + publishState: publishState, + done: make(chan struct{}), + stopWg: &sync.WaitGroup{}, + id: uuid.NewV4(), + outletFactory: outletFactory, } if err := config.Unpack(&h.config); err != nil { @@ -116,8 +120,6 @@ func NewHarvester( } // Add outlet signal so harvester can also stop itself - outlet = channel.CloseOnSignal(outlet, h.done) - h.forwarder = harvester.NewForwarder(outlet) return h, nil } @@ -164,6 +166,10 @@ func (h *Harvester) Run() error { if h.onTerminate != nil { defer h.onTerminate() } + + outlet := channel.CloseOnSignal(h.outletFactory(), h.done) + forwarder := harvester.NewForwarder(outlet) + // This is to make sure a harvester is not started anymore if stop was already // called before the harvester was started. The waitgroup is not incremented afterwards // as otherwise it could happened that between checking for the close channel and incrementing @@ -308,7 +314,7 @@ func (h *Harvester) Run() error { // Always send event to update state, also if lines was skipped // Stop harvester in case of an error - if !h.sendEvent(data) { + if !h.sendEvent(data, forwarder) { return nil } @@ -335,12 +341,12 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent -func (h *Harvester) sendEvent(data *util.Data) bool { +func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) bool { if h.source.HasState() { h.states.Update(data.GetState()) } - err := h.forwarder.Send(data) + err := forwarder.Send(data) return err == nil } diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index bffe042a4191..88be68fde1c3 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -572,10 +572,21 @@ func (p *Input) isCleanInactive(state file.State) bool { return false } +// subOutletWrap returns a factory method that will wrap the passed outlet +// in a SubOutlet and memoize the result so the wrapping is done only once. +func subOutletWrap(outlet channel.Outleter) func() channel.Outleter { + var subOutlet channel.Outleter + return func() channel.Outleter { + if subOutlet == nil { + subOutlet = channel.SubOutlet(outlet) + } + return subOutlet + } +} + // createHarvester creates a new harvester instance from the given state func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { // Each wraps the outlet, for closing the outlet individually - outlet := channel.SubOutlet(p.outlet) h, err := NewHarvester( p.cfg, state, @@ -583,7 +594,7 @@ func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harveste func(d *util.Data) bool { return p.stateOutlet.OnEvent(d) }, - outlet, + subOutletWrap(p.outlet), ) if err == nil { h.onTerminate = onTerminate diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 81aa4dab29d3..025eb064d816 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -73,7 +73,9 @@ func (p *Input) createHarvester(state file.State) (*log.Harvester, error) { h, err := log.NewHarvester( p.cfg, state, nil, nil, - p.outlet, + func() channel.Outleter { + return p.outlet + }, ) return h, err