From 44077a0ea3a932f51cb4492ec6102a55d578006d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 10 May 2019 19:00:59 +0200 Subject: [PATCH] Fix goroutine leak on initialization failures of log input (#12125) (#12144) Outlets are created during log input initialization, and if it fails they were never freed. Handle this case. (cherry picked from commit f2473d258b9c2c9d47d7a3bc5905f4c80ad4077f) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/input.go | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7568ddf7d9af..7257b91589a4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v6.7.2...6.8[Check the HEAD diff] - Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] - Fix initialization of the TCP input logger. {pull}11605[11605] +- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index cc8d575f8a8d..6d5cf35ee69f 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -76,6 +76,12 @@ func NewInput( outlet channel.Connector, context input.Context, ) (input.Input, error) { + cleanupNeeded := true + cleanupIfNeeded := func(f func() error) { + if cleanupNeeded { + f() + } + } // Note: underlying output. // The input and harvester do have different requirements @@ -87,11 +93,13 @@ func NewInput( if err != nil { return nil, err } + defer cleanupIfNeeded(out.Close) // stateOut will only be unblocked if the beat is shut down. // otherwise it can block on a full publisher pipeline, so state updates // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + defer cleanupIfNeeded(stateOut.Close) meta := context.Meta if len(meta) == 0 { @@ -137,6 +145,7 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) + cleanupNeeded = false return p, nil }