diff --git a/go.mod b/go.mod index c947154115f5e..6d91047a79495 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( k8s.io/klog v1.0.0 ) -replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7 +replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03 replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible diff --git a/go.sum b/go.sum index 450aef40f0ba7..38df66c76dffc 100644 --- a/go.sum +++ b/go.sum @@ -591,8 +591,8 @@ github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= -github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7 h1:eeBhshivxpgHEX78QxJkoL251Pjr0B2GL59ZsivnplU= -github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7/go.mod h1:aS6CMYGLEIABOzX3OL8SqZ3zAZCGN7nmBnqgnyJGxyA= +github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03 h1:fGgFrAraMB0BaPfYumu+iulfDXwHm+GFyHA4xEtBqI8= +github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03/go.mod h1:GIMXMPB/lRAllP5rVDvcGif87ryO2hgD7tCtHMdHrho= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg= @@ -1413,6 +1413,7 @@ golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191113165036-4c7a9d0fe056/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1601,6 +1602,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.2.1/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo= @@ -1617,6 +1619,7 @@ gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= diff --git a/pkg/promtail/targets/file/filetarget.go b/pkg/promtail/targets/file/filetarget.go index 190c9e17cecaa..4c5c7cd4ae697 100644 --- a/pkg/promtail/targets/file/filetarget.go +++ b/pkg/promtail/targets/file/filetarget.go @@ -163,9 +163,9 @@ func (t *FileTarget) run() { defer func() { helpers.LogError("closing watcher", t.watcher.Close) for _, v := range t.tails { - v.stop(false) + v.stop() } - level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved") + level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path) close(t.done) }() @@ -313,7 +313,7 @@ func (t *FileTarget) startTailing(ps []string) { func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { for _, p := range ps { if tailer, ok := t.tails[p]; ok { - tailer.stop(true) + tailer.stop() t.positions.Remove(tailer.path) delete(t.tails, p) } diff --git a/pkg/promtail/targets/file/tailer.go b/pkg/promtail/targets/file/tailer.go index f0ae20be85dcf..8860245e6cbf6 100644 --- a/pkg/promtail/targets/file/tailer.go +++ b/pkg/promtail/targets/file/tailer.go @@ -25,9 +25,11 @@ type tailer struct { tail *tail.Tail posAndSizeMtx sync.Mutex + stopOnce sync.Once running *atomic.Bool - quit chan struct{} + posquit chan struct{} + posdone chan struct{} done chan struct{} } @@ -69,35 +71,29 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions. path: path, tail: tail, running: atomic.NewBool(false), - quit: make(chan struct{}), + posquit: make(chan struct{}), + posdone: make(chan struct{}), done: make(chan struct{}), } tail.Logger = util.NewLogAdapter(logger) - go tailer.run() + go tailer.readLines() + go tailer.updatePosition() filesActive.Add(1.) return tailer, nil } -func (t *tailer) run() { - level.Info(t.logger).Log("msg", "start tailing file", "path", t.path) +// updatePosition is run in a goroutine and checks the current size of the file and saves it to the positions file +// at a regular interval. If there is ever an error it stops the tailer and exits, the tailer will be re-opened +// by the filetarget sync method if it still exists and will start reading from the last successful entry in the +// positions file. +func (t *tailer) updatePosition() { positionSyncPeriod := t.positions.SyncPeriod() positionWait := time.NewTicker(positionSyncPeriod) - t.running.Store(true) - - // This function runs in a goroutine, if it exits this tailer will never do any more tailing. - // Clean everything up. defer func() { - err := t.tail.Stop() - if err != nil { - level.Error(t.logger).Log("msg", "error stopping tailer when exiting tail goroutine", "path", t.path, "error", err) - } - positionWait.Stop() - t.cleanupMetrics() - t.running.Store(false) - - close(t.done) + level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path) + close(t.posdone) }() for { @@ -105,28 +101,56 @@ func (t *tailer) run() { case <-positionWait.C: err := t.markPositionAndSize() if err != nil { - level.Error(t.logger).Log("msg", "error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) + level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) + err := t.tail.Stop() + if err != nil { + level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.path, "error", err) + } return } + case <-t.posquit: + return + } + } +} + +// readLines runs in a goroutine and consumes the t.tail.Lines channel from the underlying tailer. +// it will only exit when that channel is closed. This is important to avoid a deadlock in the underlying +// tailer which can happen if there are unread lines in this channel and the Stop method on the tailer +// is called, the underlying tailer will never exit if there are unread lines in the t.tail.Lines channel +func (t *tailer) readLines() { + level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path) + + t.running.Store(true) + + // This function runs in a goroutine, if it exits this tailer will never do any more tailing. + // Clean everything up. + defer func() { + t.cleanupMetrics() + t.running.Store(false) + level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path) + close(t.done) + }() + for { + select { case line, ok := <-t.tail.Lines: if !ok { + level.Info(t.logger).Log("msg", "tail routine: tail channel closed, stopping tailer", "path", t.path) return } // Note currently the tail implementation hardcodes Err to nil, this should never hit. if line.Err != nil { - level.Error(t.logger).Log("msg", "error reading line", "path", t.path, "error", line.Err) + level.Error(t.logger).Log("msg", "tail routine: error reading line", "path", t.path, "error", line.Err) continue } readLines.WithLabelValues(t.path).Inc() logLengthHistogram.WithLabelValues(t.path).Observe(float64(len(line.Text))) if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil { - level.Error(t.logger).Log("msg", "error handling line", "path", t.path, "error", err) + level.Error(t.logger).Log("msg", "tail routine: error handling line", "path", t.path, "error", err) } - case <-t.quit: - return } } } @@ -136,33 +160,50 @@ func (t *tailer) markPositionAndSize() error { t.posAndSizeMtx.Lock() defer t.posAndSizeMtx.Unlock() - pos, err := t.tail.Tell() + size, err := t.tail.Size() if err != nil { + // If the file no longer exists, no need to save position information + if err == os.ErrNotExist { + level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist") + return nil + } return err } - readBytes.WithLabelValues(t.path).Set(float64(pos)) - t.positions.Put(t.path, pos) + totalBytes.WithLabelValues(t.path).Set(float64(size)) - size, err := t.tail.Size() + pos, err := t.tail.Tell() if err != nil { return err } - totalBytes.WithLabelValues(t.path).Set(float64(size)) + readBytes.WithLabelValues(t.path).Set(float64(pos)) + t.positions.Put(t.path, pos) return nil } -func (t *tailer) stop(removed bool) { - // Save the current position before shutting down tailer - if !removed { +func (t *tailer) stop() { + // stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once + // we wrap the stop in a sync.Once. + t.stopOnce.Do(func() { + // Shut down the position marker thread + close(t.posquit) + <-t.posdone + + // Save the current position before shutting down tailer err := t.markPositionAndSize() if err != nil { level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) } - } - close(t.quit) - <-t.done - level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) + + // Stop the underlying tailer + err = t.tail.Stop() + if err != nil { + level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err) + } + // Wait for readLines() to consume all the remaining messages and exit when the channel is closed + <-t.done + level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) + }) return } diff --git a/vendor/github.com/hpcloud/tail/.gitignore b/vendor/github.com/hpcloud/tail/.gitignore index 6d9953c3c6ac2..748c974ddb6c4 100644 --- a/vendor/github.com/hpcloud/tail/.gitignore +++ b/vendor/github.com/hpcloud/tail/.gitignore @@ -1,3 +1,4 @@ .test .go +.idea diff --git a/vendor/github.com/hpcloud/tail/go.mod b/vendor/github.com/hpcloud/tail/go.mod new file mode 100644 index 0000000000000..2ebebb207a5e5 --- /dev/null +++ b/vendor/github.com/hpcloud/tail/go.mod @@ -0,0 +1,10 @@ +module github.com/hpcloud/tail + +go 1.13 + +require ( + golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2 // indirect + gopkg.in/fsnotify.v1 v1.2.1 // indirect + gopkg.in/fsnotify/fsnotify.v1 v1.4.7 + gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947 +) diff --git a/vendor/github.com/hpcloud/tail/go.sum b/vendor/github.com/hpcloud/tail/go.sum new file mode 100644 index 0000000000000..e41866c203825 --- /dev/null +++ b/vendor/github.com/hpcloud/tail/go.sum @@ -0,0 +1,7 @@ +golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2 h1:wAW1U21MfVN0sUipAD8952TBjGXMRHFKQugDlQ9RwwE= +golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/fsnotify.v1 v1.2.1/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo= +gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= +gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947 h1:aNEcl02ps/eZaBJi2LycKl0jPsUryySSSgrCxieZRYA= +gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/vendor/github.com/hpcloud/tail/tail.go b/vendor/github.com/hpcloud/tail/tail.go index c93c8e6c7577b..db7dc49a7438b 100644 --- a/vendor/github.com/hpcloud/tail/tail.go +++ b/vendor/github.com/hpcloud/tail/tail.go @@ -86,7 +86,8 @@ type Tail struct { tomb.Tomb // provides: Done, Kill, Dying - lk sync.Mutex + fileMtx sync.Mutex + lk sync.Mutex } var ( @@ -139,29 +140,38 @@ func TailFile(filename string, config Config) (*Tail, error) { // But this value is not very accurate. // it may readed one line in the chan(tail.Lines), // so it may lost one line. -func (tail *Tail) Tell() (offset int64, err error) { - if tail.file == nil { - return +func (tail *Tail) Tell() (int64, error) { + tail.fileMtx.Lock() + f := tail.file + tail.fileMtx.Unlock() + if f == nil { + return 0, os.ErrNotExist } - offset, err = tail.file.Seek(0, io.SeekCurrent) + offset, err := f.Seek(0, io.SeekCurrent) if err != nil { - return + return 0, err } tail.lk.Lock() defer tail.lk.Unlock() if tail.reader == nil { - return + return 0, nil } offset -= int64(tail.reader.Buffered()) - return + return offset, nil } // Size returns the length in bytes of the file being tailed, // or 0 with an error if there was an error Stat'ing the file. func (tail *Tail) Size() (int64, error) { - fi, err := tail.file.Stat() + tail.fileMtx.Lock() + f := tail.file + tail.fileMtx.Unlock() + if f == nil { + return 0, os.ErrNotExist + } + fi, err := f.Stat() if err != nil { return 0, err } @@ -189,6 +199,8 @@ func (tail *Tail) close() { } func (tail *Tail) closeFile() { + tail.fileMtx.Lock() + defer tail.fileMtx.Unlock() if tail.file != nil { tail.file.Close() tail.file = nil @@ -211,7 +223,9 @@ func (tail *Tail) reopen(truncated bool) error { tail.closeFile() for { var err error + tail.fileMtx.Lock() tail.file, err = OpenFile(tail.Filename) + tail.fileMtx.Unlock() if err != nil { if os.IsNotExist(err) { tail.Logger.Printf("Waiting for %s to appear...", tail.Filename) diff --git a/vendor/modules.txt b/vendor/modules.txt index 47b2c474d9032..5a4d842f2d980 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -516,7 +516,7 @@ github.com/hashicorp/golang-lru/simplelru github.com/hashicorp/memberlist # github.com/hashicorp/serf v0.9.0 github.com/hashicorp/serf/coordinate -# github.com/hpcloud/tail v1.0.0 => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7 +# github.com/hpcloud/tail v1.0.0 => github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03 ## explicit github.com/hpcloud/tail github.com/hpcloud/tail/ratelimiter @@ -1474,7 +1474,7 @@ rsc.io/binaryregexp/syntax sigs.k8s.io/structured-merge-diff/v3/value # sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml -# github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7 +# github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03 # github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible # k8s.io/client-go => k8s.io/client-go v0.18.3 # github.com/satori/go.uuid => github.com/satori/go.uuid v1.2.0