diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4b8f0c108ce4..2cff6a140109 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] - Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] +- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 3841fc23973f..ff7ff10090f9 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -76,6 +76,7 @@ func NewInput( outlet channel.Connector, context input.Context, ) (input.Input, error) { + cleanupNeeded := true // Note: underlying output. // The input and harvester do have different requirements @@ -87,11 +88,21 @@ func NewInput( if err != nil { return nil, err } + defer func() { + if cleanupNeeded { + out.Close() + } + }() // stateOut will only be unblocked if the beat is shut down. // otherwise it can block on a full publisher pipeline, so state updates // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + defer func() { + if cleanupNeeded { + stateOut.Close() + } + }() meta := context.Meta if len(meta) == 0 { @@ -137,6 +148,7 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) + cleanupNeeded = false return p, nil }