diff --git a/internal/tools/go.mod b/internal/tools/go.mod index 7978180d..b9071dc7 100644 --- a/internal/tools/go.mod +++ b/internal/tools/go.mod @@ -6,4 +6,5 @@ require ( github.com/golangci/golangci-lint v1.40.1 github.com/google/addlicense v0.0.0-20200906110928-a0294312aa76 github.com/tcnksm/ghr v0.13.0 + github.com/vektra/mockery v1.1.2 // indirect ) diff --git a/internal/tools/go.sum b/internal/tools/go.sum index 544cc58a..7b10b615 100644 --- a/internal/tools/go.sum +++ b/internal/tools/go.sum @@ -657,6 +657,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.16.0/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA= github.com/valyala/quicktemplate v1.6.3/go.mod h1:fwPzK2fHuYEODzJ9pkw0ipCPNHZ2tD5KW4lOuSdPKzY= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/vektra/mockery v1.1.2 h1:uc0Yn67rJpjt8U/mAZimdCKn9AeA97BOkjpmtBSlfP4= +github.com/vektra/mockery v1.1.2/go.mod h1:VcfZjKaFOPO+MpN4ZvwPjs4c48lkq1o3Ym8yHZJu0jU= github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= @@ -913,6 +915,7 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200227222343-706bc42d1f0d/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= golang.org/x/tools v0.0.0-20200312045724-11d5b4c81c7d/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= +golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200324003944-a576cf524670/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200329025819-fd4102a86c65/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index b806b3ac..e00fcca5 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -151,10 +151,11 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, firstCheck: true, cancel: func() {}, knownFiles: make([]*Reader, 0, 10), + tailingReaders: map[string][]*Reader{}, + readerQueue: []*Reader{}, fingerprintSize: int(c.FingerprintSize), MaxLogSize: int(c.MaxLogSize), MaxConcurrentFiles: c.MaxConcurrentFiles, - SeenPaths: make(map[string]struct{}, 100), } return []operator.Operator{op}, nil diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index ad1fea14..aad4dcd7 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -45,12 +45,13 @@ type InputOperator struct { SplitFunc bufio.SplitFunc MaxLogSize int MaxConcurrentFiles int - SeenPaths map[string]struct{} persister operator.Persister - knownFiles []*Reader - queuedMatches []string + knownFiles []*Reader + queuedMatches []string + readerQueue []*Reader + tailingReaders map[string][]*Reader startAtBeginning bool @@ -87,6 +88,12 @@ func (f *InputOperator) Stop() error { f.wg.Wait() f.knownFiles = nil f.cancel = nil + for _, readers := range f.tailingReaders { + for _, reader := range readers { + f.closeFile(reader.file, reader.Path) + } + } + f.tailingReaders = nil return nil } @@ -111,55 +118,67 @@ func (f *InputOperator) startPoller(ctx context.Context) { }() } +const RotatedFileTrackingLimit = 10 + // poll checks all the watched paths for new entries func (f *InputOperator) poll(ctx context.Context) { - var matches []string - if len(f.queuedMatches) > f.MaxConcurrentFiles { - matches, f.queuedMatches = f.queuedMatches[:f.MaxConcurrentFiles], f.queuedMatches[f.MaxConcurrentFiles:] - } else { - if len(f.queuedMatches) > 0 { - matches, f.queuedMatches = f.queuedMatches, make([]string, 0) - } else { - // Increment the generation on all known readers - // This is done here because the next generation is about to start - for i := 0; i < len(f.knownFiles); i++ { - f.knownFiles[i].generation++ - } - - // Get the list of paths on disk - matches = getMatches(f.Include, f.Exclude) - if f.firstCheck && len(matches) == 0 { - f.Warnw("no files match the configured include patterns", "include", f.Include) - } else if len(matches) > f.MaxConcurrentFiles { - matches, f.queuedMatches = matches[:f.MaxConcurrentFiles], matches[f.MaxConcurrentFiles:] - } - } + matches := getMatches(f.Include, f.Exclude) + if f.firstCheck && len(matches) == 0 { + f.Warnw("no files match the configured include patterns", "include", f.Include) } - - // Open the files first to minimize the time between listing and opening - files := make([]*os.File, 0, len(matches)) for _, path := range matches { - if _, ok := f.SeenPaths[path]; !ok { - if f.startAtBeginning { - f.Infow("Started watching file", "path", path) - } else { - f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) - } - f.SeenPaths[path] = struct{}{} - } - file, err := os.Open(path) + reader, err := f.isNewFileToTail(path) if err != nil { - f.Errorw("Failed to open file", zap.Error(err)) continue } - files = append(files, file) - } + if reader == nil { + continue + } - readers := f.makeReaders(files) + if _, ok := f.tailingReaders[path]; !ok { // If path doesn't exist => new file to tail + if f.firstCheck { + if f.startAtBeginning { + f.Infow("Started watching file", "path", path) + } else { + f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) + } + if err := reader.InitializeOffset(f.startAtBeginning); err != nil { + f.Errorw("Failed to initialize offset for "+path, zap.Error(err)) + } + } else { + f.Infow("Started watching file", "path", path) + } + f.tailingReaders[path] = []*Reader{reader} + } else { // If path exists + f.Infow("Log rotation detected. Started watching file", "path", path) + f.tailingReaders[path] = append(f.tailingReaders[path], reader) + // Limit tracking the iteration of rotated files + if len(f.tailingReaders[path]) > RotatedFileTrackingLimit { + var firstReader *Reader + firstReader, f.tailingReaders[path] = f.tailingReaders[path][0], f.tailingReaders[path][1:] + f.closeFile(firstReader.file, firstReader.Path) + } + } + } f.firstCheck = false + readerCount := 0 + for _, readers := range f.tailingReaders { + readerCount += len(readers) + } + + if len(f.readerQueue) <= readerCount { + for _, readers := range f.tailingReaders { + f.readerQueue = append(f.readerQueue, readers...) + } + } + + count := min0(f.MaxConcurrentFiles, readerCount) + count = min0(count, len(f.readerQueue)) + polledReaders := f.readerQueue[:count] + f.readerQueue = f.readerQueue[count:] var wg sync.WaitGroup - for _, reader := range readers { + for _, reader := range polledReaders { wg.Add(1) go func(r *Reader) { defer wg.Done() @@ -169,13 +188,6 @@ func (f *InputOperator) poll(ctx context.Context) { // Wait until all the reader goroutines are finished wg.Wait() - - // Close all files - for _, file := range files { - file.Close() - } - - f.saveCurrent(readers) f.syncLastPollFiles(ctx) } @@ -205,113 +217,72 @@ func getMatches(includes, excludes []string) []string { return all } -// makeReaders takes a list of paths, then creates readers from each of those paths, -// discarding any that have a duplicate fingerprint to other files that have already -// been read this polling interval -func (f *InputOperator) makeReaders(files []*os.File) []*Reader { - // Get fingerprints for each file - fps := make([]*Fingerprint, 0, len(files)) - for _, file := range files { - fp, err := f.NewFingerprint(file) - if err != nil { - f.Errorw("Failed creating fingerprint", zap.Error(err)) - continue - } - fps = append(fps, fp) +// isNewFileToTail compares fingerprints with already tailing files to see if it is a new file or not +func (f *InputOperator) isNewFileToTail(path string) (*Reader, error) { + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file", zap.Error(err)) + return nil, err } - - // Make a copy of the files so we don't modify the original - filesCopy := make([]*os.File, len(files)) - copy(filesCopy, files) - - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files -OUTER: - for i := 0; i < len(fps); { - fp := fps[i] - if len(fp.FirstBytes) == 0 { - // Empty file, don't read it until we can compare its fingerprint - fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) - } - - for j := 0; j < len(fps); j++ { - if i == j { - // Skip checking itself - continue - } - - fp2 := fps[j] - if fp.StartsWith(fp2) || fp2.StartsWith(fp) { - // Exclude - fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) - continue OUTER - } - } - i++ + fp, err := f.NewFingerprint(file) + if err != nil { + f.Errorw("Failed to make FingerPrint", zap.Error(err)) + f.closeFile(file, path) + return nil, err } - - readers := make([]*Reader, 0, len(fps)) - for i := 0; i < len(fps); i++ { - reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck) - if err != nil { - f.Errorw("Failed to create reader", zap.Error(err)) - continue - } - readers = append(readers, reader) + if len(fp.FirstBytes) == 0 { + f.closeFile(file, path) + return nil, nil } - return readers -} + newReader, err := f.NewReader(path, file, fp) + if err != nil { + f.Errorw("Failed to make reader for "+path, zap.Error(err)) + f.closeFile(file, path) + return nil, err + } -// saveCurrent adds the readers from this polling interval to this list of -// known files, then increments the generation of all tracked old readers -// before clearing out readers that have existed for 3 generations. -func (f *InputOperator) saveCurrent(readers []*Reader) { - // Add readers from the current, completed poll interval to the list of known files - f.knownFiles = append(f.knownFiles, readers...) - - // Clear out old readers. They are sorted such that they are oldest first, - // so we can just find the first reader whose generation is less than our - // max, and keep every reader after that - for i := 0; i < len(f.knownFiles); i++ { - reader := f.knownFiles[i] - if reader.generation <= 3 { - f.knownFiles = f.knownFiles[i:] - break + existingReader, exist := f.findFingerprintMatch(fp) + if exist { + if existingReader.Path != path { // chunked rotated file. need to tail this file from previous offset. + f.Infow("Detected rotated file with a new name. Restoring the correct offset info from the original file.", "path", path) + newReader.Offset = existingReader.Offset + f.removeReader(existingReader) + return newReader, nil + } else { + // already tailing + f.closeFile(file, path) + return nil, nil } } + return newReader, nil } -func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) { - // Check if the new path has the same fingerprint as an old path - if oldReader, ok := f.findFingerprintMatch(fp); ok { - newReader, err := oldReader.Copy(file) - if err != nil { - return nil, err +func (f *InputOperator) removeReader(reader *Reader) { + f.closeFile(reader.file, reader.Path) + for i, r := range f.tailingReaders[reader.Path] { + if reader.Fingerprint.StartsWith(r.Fingerprint) { + f.tailingReaders[reader.Path] = append(f.tailingReaders[reader.Path][:i], f.tailingReaders[reader.Path][i+1:]...) + if len(f.tailingReaders[reader.Path]) == 0 { + delete(f.tailingReaders, reader.Path) + } } - newReader.Path = file.Name() - return newReader, nil - } - - // If we don't match any previously known files, create a new reader from scratch - newReader, err := f.NewReader(file.Name(), file, fp) - if err != nil { - return nil, err } - startAtBeginning := !firstCheck || f.startAtBeginning - if err := newReader.InitializeOffset(startAtBeginning); err != nil { - return nil, fmt.Errorf("initialize offset: %s", err) + for i := 0; i < len(f.readerQueue); i++ { + r := f.readerQueue[i] + if reader.Fingerprint.StartsWith(r.Fingerprint) { + f.readerQueue = append(f.readerQueue[:i], f.readerQueue[i+1:]...) + i -= 1 + } } - return newReader, nil } func (f *InputOperator) findFingerprintMatch(fp *Fingerprint) (*Reader, bool) { - // Iterate backwards to match newest first - for i := len(f.knownFiles) - 1; i >= 0; i-- { - oldReader := f.knownFiles[i] - if fp.StartsWith(oldReader.Fingerprint) { - return oldReader, true + for _, readers := range f.tailingReaders { + for _, reader := range readers { + if fp.StartsWith(reader.Fingerprint) { + return reader, true + } } } return nil, false @@ -324,15 +295,16 @@ func (f *InputOperator) syncLastPollFiles(ctx context.Context) { var buf bytes.Buffer enc := json.NewEncoder(&buf) - // Encode the number of known files - if err := enc.Encode(len(f.knownFiles)); err != nil { + // Encode the number of known files. + if err := enc.Encode(len(f.tailingReaders)); err != nil { f.Errorw("Failed to encode known files", zap.Error(err)) return } - // Encode each known file - for _, fileReader := range f.knownFiles { - if err := enc.Encode(fileReader); err != nil { + // Encode each known file. For files that has same path, encode only latest one. + for _, readers := range f.tailingReaders { + lastReader := readers[len(readers)-1] + if err := enc.Encode(lastReader); err != nil { f.Errorw("Failed to encode known files", zap.Error(err)) } } @@ -349,11 +321,10 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return err } + f.tailingReaders = map[string][]*Reader{} if encoded == nil { - f.knownFiles = make([]*Reader, 0, 10) return nil } - dec := json.NewDecoder(bytes.NewReader(encoded)) // Decode the number of entries @@ -361,19 +332,38 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { if err := dec.Decode(&knownFileCount); err != nil { return fmt.Errorf("decoding file count: %w", err) } - // Decode each of the known files - f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - newReader, err := f.NewReader("", nil, nil) + decodedReader, err := f.NewReader("", nil, nil) if err != nil { return err } - if err = dec.Decode(newReader); err != nil { + if err = dec.Decode(decodedReader); err != nil { return err } - f.knownFiles = append(f.knownFiles, newReader) + path := decodedReader.Path + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file while recovering checkpoints", zap.Error(err)) + f.tailingReaders[path] = []*Reader{decodedReader} + continue + } + + restoredReader, err := f.NewReader(path, file, decodedReader.Fingerprint) + if err != nil { + f.Errorw("Failed to restore a Reader", zap.Error(err), "path", path) + continue + } + restoredReader.Offset = decodedReader.Offset + f.tailingReaders[path] = []*Reader{restoredReader} } return nil } + +func (f *InputOperator) closeFile(file *os.File, path string) { + err := file.Close() + if err != nil { + f.Warnw("Error closing a file", "file", path, "err", err) + } +} diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 7851c7fd..fd610cb8 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -1207,7 +1207,7 @@ func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { select { case e := <-c: return e - case <-time.After(time.Second): + case <-time.After(time.Second * 3): require.FailNow(t, "Timed out waiting for message") return nil } @@ -1219,7 +1219,7 @@ func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { select { case e := <-c: messages = append(messages, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(time.Second * 3): require.FailNow(t, "Timed out waiting for message") return nil } @@ -1231,7 +1231,7 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { select { case e := <-c: require.Equal(t, expected, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(time.Second * 3): require.FailNow(t, "Timed out waiting for message", expected) } } @@ -1243,7 +1243,7 @@ LOOP: select { case e := <-c: receivedMessages = append(receivedMessages, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(time.Second * 10): break LOOP } } @@ -1340,7 +1340,7 @@ func TestEncodings(t *testing.T) { select { case entry := <-receivedEntries: require.Equal(t, expected, []byte(entry.Body.(string))) - case <-time.After(500 * time.Millisecond): + case <-time.After(time.Second * 5): require.FailNow(t, "Timed out waiting for entry to be read") } } diff --git a/operator/builtin/input/file/fingerprint.go b/operator/builtin/input/file/fingerprint.go index 06eada2d..8620b4a9 100644 --- a/operator/builtin/input/file/fingerprint.go +++ b/operator/builtin/input/file/fingerprint.go @@ -58,7 +58,7 @@ func (f Fingerprint) Copy() *Fingerprint { // or if the new fingerprint starts with the old one func (f Fingerprint) StartsWith(old *Fingerprint) bool { l0 := len(old.FirstBytes) - if l0 == 0 { + if l0 <= 2 { return false } l1 := len(f.FirstBytes) diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 12dbf6bd..37c38d33 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -35,9 +35,8 @@ type Reader struct { Offset int64 Path string - generation int - fileInput *InputOperator - file *os.File + fileInput *InputOperator + file *os.File decoder *encoding.Decoder decodeBuffer []byte @@ -77,6 +76,8 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error { return fmt.Errorf("stat: %s", err) } f.Offset = info.Size() + } else { + f.Offset = 0 } return nil @@ -84,10 +85,11 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error { // ReadToEnd will read until the end of the file func (f *Reader) ReadToEnd(ctx context.Context) { - defer f.file.Close() - + if f.fileInput == nil { + return + } if _, err := f.file.Seek(f.Offset, 0); err != nil { - f.Errorw("Failed to seek", zap.Error(err)) + f.Warnw("Failed to seek", zap.Error(err)) return }