diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index ecd70a4d9c96..585496d07760 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -207,6 +207,9 @@ func (h *Harvester) Run() error { } h.stop() + + // Close reader + h.reader.Stop() }(h.state.Source) logp.Info("Harvester started for file: %s, offset: %d", h.state.Source, h.state.Offset) diff --git a/filebeat/input/log/reader.go b/filebeat/input/log/reader.go index 18beeb295bd0..02cbfa5f502a 100644 --- a/filebeat/input/log/reader.go +++ b/filebeat/input/log/reader.go @@ -73,6 +73,7 @@ type ReuseHarvester struct { Config config State file.State done chan struct{} + closeOnce sync.Once fileReader *FileHarvester message chan ReuseMessage } @@ -129,7 +130,9 @@ func (r *ReuseHarvester) OnMessage(message ReuseMessage) error { //Stop: 停止harvester func (r *ReuseHarvester) Stop() { - close(r.done) + r.closeOnce.Do(func() { + close(r.done) + }) } //HasState @@ -373,6 +376,15 @@ func (h *FileHarvester) Run() { // read file h.readerDone.Add(1) go h.loopRead() + } else { + for _, reuseReader := range h.forwarders { + select { + case <-reuseReader.done: + logp.Info("forwarder is done, delete forwarder(%s)", reuseReader.HarvesterID) + delete(h.forwarders, reuseReader.HarvesterID) + default: + } + } } if len(h.forwarders) > 0 {