From 0f2bb1ea28ff05e0908745dfd49d70e1c7ebb03f Mon Sep 17 00:00:00 2001 From: Rock Baek Date: Fri, 14 May 2021 21:54:51 -0700 Subject: [PATCH] fix data loss between logrotate --- operator/builtin/input/file/config.go | 2 +- operator/builtin/input/file/file.go | 264 +++++++++------------ operator/builtin/input/file/file_test.go | 12 +- operator/builtin/input/file/fingerprint.go | 2 +- operator/builtin/input/file/reader.go | 9 +- 5 files changed, 131 insertions(+), 158 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index b806b3ac..07bb309c 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -151,10 +151,10 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, firstCheck: true, cancel: func() {}, knownFiles: make([]*Reader, 0, 10), + tailingReaders: map[string][]*Reader{}, fingerprintSize: int(c.FingerprintSize), MaxLogSize: int(c.MaxLogSize), MaxConcurrentFiles: c.MaxConcurrentFiles, - SeenPaths: make(map[string]struct{}, 100), } return []operator.Operator{op}, nil diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index ad1fea14..a69a4cdd 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -45,12 +45,13 @@ type InputOperator struct { SplitFunc bufio.SplitFunc MaxLogSize int MaxConcurrentFiles int - SeenPaths map[string]struct{} persister operator.Persister knownFiles []*Reader queuedMatches []string + tailingReaders map[string][]*Reader + readers []*Reader startAtBeginning bool @@ -87,6 +88,12 @@ func (f *InputOperator) Stop() error { f.wg.Wait() f.knownFiles = nil f.cancel = nil + for _, readers := range f.tailingReaders { + for _, reader := range readers { + f.closeFile(reader.file, reader.Path) + } + } + f.tailingReaders = nil return nil } @@ -111,6 +118,7 @@ func (f *InputOperator) startPoller(ctx context.Context) { }() } +const RotatedFileTrackingLimit = 10240 // poll checks all the watched paths for new entries func (f *InputOperator) poll(ctx context.Context) { var matches []string @@ -120,12 +128,6 @@ func (f *InputOperator) poll(ctx context.Context) { if len(f.queuedMatches) > 0 { matches, f.queuedMatches = f.queuedMatches, make([]string, 0) } else { - // Increment the generation on all known readers - // This is done here because the next generation is about to start - for i := 0; i < len(f.knownFiles); i++ { - f.knownFiles[i].generation++ - } - // Get the list of paths on disk matches = getMatches(f.Include, f.Exclude) if f.firstCheck && len(matches) == 0 { @@ -135,47 +137,67 @@ func (f *InputOperator) poll(ctx context.Context) { } } } - - // Open the files first to minimize the time between listing and opening - files := make([]*os.File, 0, len(matches)) for _, path := range matches { - if _, ok := f.SeenPaths[path]; !ok { - if f.startAtBeginning { - f.Infow("Started watching file", "path", path) - } else { - f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) - } - f.SeenPaths[path] = struct{}{} - } - file, err := os.Open(path) + toTail, file, fp, err := f.isNewFileToTail(path) if err != nil { - f.Errorw("Failed to open file", zap.Error(err)) continue } - files = append(files, file) - } + if !toTail { + continue + } - readers := f.makeReaders(files) + if _, ok := f.tailingReaders[path]; !ok { // If path doesn't exist => new file to tail + reader, err := f.NewReader(path, file, fp) + if err != nil { + f.Errorw("Failed to make reader for " + path, zap.Error(err)) + f.closeFile(reader.file, reader.Path) + continue + } + if f.firstCheck { + if f.startAtBeginning { + f.Infow("Started watching file", "path", path) + } else { + f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) + } + if err := reader.InitializeOffset(f.startAtBeginning); err != nil { + f.Errorw("Failed to initialize offset for " + path , zap.Error(err)) + } + } else { + f.Infow("Started watching file", "path", path) + } + f.tailingReaders[path] = []*Reader{reader} + } else { // If path exists + reader, err := f.NewReader(path, file, fp) + if err != nil { + f.Errorw("Failed to make reader for " + path, zap.Error(err)) + f.closeFile(file, path) + continue + } + f.Infow("Log rotation detected. Started watching file", "path", path) + f.tailingReaders[path] = append(f.tailingReaders[path], reader) + // Limit tracking the iteration of rotated files + if len(f.tailingReaders[path]) > RotatedFileTrackingLimit { + var firstReader *Reader + firstReader, f.tailingReaders[path] = f.tailingReaders[path][0], f.tailingReaders[path][1:] + f.closeFile(firstReader.file, firstReader.Path) + } + } + } f.firstCheck = false var wg sync.WaitGroup - for _, reader := range readers { - wg.Add(1) - go func(r *Reader) { - defer wg.Done() - r.ReadToEnd(ctx) - }(reader) + for _, readers := range f.tailingReaders { + for _, reader := range readers { + wg.Add(1) + go func(r *Reader) { + defer wg.Done() + r.ReadToEnd(ctx) + }(reader) + } } // Wait until all the reader goroutines are finished wg.Wait() - - // Close all files - for _, file := range files { - file.Close() - } - - f.saveCurrent(readers) f.syncLastPollFiles(ctx) } @@ -205,113 +227,38 @@ func getMatches(includes, excludes []string) []string { return all } -// makeReaders takes a list of paths, then creates readers from each of those paths, -// discarding any that have a duplicate fingerprint to other files that have already -// been read this polling interval -func (f *InputOperator) makeReaders(files []*os.File) []*Reader { - // Get fingerprints for each file - fps := make([]*Fingerprint, 0, len(files)) - for _, file := range files { - fp, err := f.NewFingerprint(file) - if err != nil { - f.Errorw("Failed creating fingerprint", zap.Error(err)) - continue - } - fps = append(fps, fp) - } - - // Make a copy of the files so we don't modify the original - filesCopy := make([]*os.File, len(files)) - copy(filesCopy, files) - - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files -OUTER: - for i := 0; i < len(fps); { - fp := fps[i] - if len(fp.FirstBytes) == 0 { - // Empty file, don't read it until we can compare its fingerprint - fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) - } - - for j := 0; j < len(fps); j++ { - if i == j { - // Skip checking itself - continue - } - - fp2 := fps[j] - if fp.StartsWith(fp2) || fp2.StartsWith(fp) { - // Exclude - fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) - continue OUTER - } - } - i++ - } - - readers := make([]*Reader, 0, len(fps)) - for i := 0; i < len(fps); i++ { - reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck) - if err != nil { - f.Errorw("Failed to create reader", zap.Error(err)) - continue - } - readers = append(readers, reader) +// isNewFileToTail compares fingerprints with already tailing files to see if it is a new file or not +func (f *InputOperator) isNewFileToTail(path string) (bool, *os.File, *Fingerprint, error) { + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file", zap.Error(err)) + return false, nil, nil, err } - - return readers -} - -// saveCurrent adds the readers from this polling interval to this list of -// known files, then increments the generation of all tracked old readers -// before clearing out readers that have existed for 3 generations. -func (f *InputOperator) saveCurrent(readers []*Reader) { - // Add readers from the current, completed poll interval to the list of known files - f.knownFiles = append(f.knownFiles, readers...) - - // Clear out old readers. They are sorted such that they are oldest first, - // so we can just find the first reader whose generation is less than our - // max, and keep every reader after that - for i := 0; i < len(f.knownFiles); i++ { - reader := f.knownFiles[i] - if reader.generation <= 3 { - f.knownFiles = f.knownFiles[i:] - break - } + fp, err := f.NewFingerprint(file) + if err != nil{ + f.Errorw("Failed to make FingerPrint", zap.Error(err)) + f.closeFile(file, path) + return false, nil, nil, err } -} - -func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) { - // Check if the new path has the same fingerprint as an old path - if oldReader, ok := f.findFingerprintMatch(fp); ok { - newReader, err := oldReader.Copy(file) - if err != nil { - return nil, err - } - newReader.Path = file.Name() - return newReader, nil + if len(fp.FirstBytes) == 0 { + f.closeFile(file, path) + return false, nil, nil, nil } - // If we don't match any previously known files, create a new reader from scratch - newReader, err := f.NewReader(file.Name(), file, fp) - if err != nil { - return nil, err - } - startAtBeginning := !firstCheck || f.startAtBeginning - if err := newReader.InitializeOffset(startAtBeginning); err != nil { - return nil, fmt.Errorf("initialize offset: %s", err) + _, exist := f.findFingerprintMatch(fp) + if exist { // Already tailing + f.closeFile(file, path) + return false, nil, nil, nil } - return newReader, nil + return true, file, fp, nil } func (f *InputOperator) findFingerprintMatch(fp *Fingerprint) (*Reader, bool) { - // Iterate backwards to match newest first - for i := len(f.knownFiles) - 1; i >= 0; i-- { - oldReader := f.knownFiles[i] - if fp.StartsWith(oldReader.Fingerprint) { - return oldReader, true + for _, readers := range f.tailingReaders { + for _, reader := range readers { + if fp.StartsWith(reader.Fingerprint) { + return reader, true + } } } return nil, false @@ -324,15 +271,16 @@ func (f *InputOperator) syncLastPollFiles(ctx context.Context) { var buf bytes.Buffer enc := json.NewEncoder(&buf) - // Encode the number of known files - if err := enc.Encode(len(f.knownFiles)); err != nil { + // Encode the number of known files. + if err := enc.Encode(len(f.tailingReaders)); err != nil { f.Errorw("Failed to encode known files", zap.Error(err)) return } - // Encode each known file - for _, fileReader := range f.knownFiles { - if err := enc.Encode(fileReader); err != nil { + // Encode each known file. For files that has same path, encode only latest one. + for _, readers := range f.tailingReaders { + lastReader := readers[len(readers)-1] + if err := enc.Encode(lastReader); err != nil { f.Errorw("Failed to encode known files", zap.Error(err)) } } @@ -349,11 +297,10 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return err } + f.tailingReaders = map[string][]*Reader{} if encoded == nil { - f.knownFiles = make([]*Reader, 0, 10) return nil } - dec := json.NewDecoder(bytes.NewReader(encoded)) // Decode the number of entries @@ -361,19 +308,42 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { if err := dec.Decode(&knownFileCount); err != nil { return fmt.Errorf("decoding file count: %w", err) } - // Decode each of the known files - f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - newReader, err := f.NewReader("", nil, nil) + decodedReader, err := f.NewReader("", nil, nil) if err != nil { return err } - if err = dec.Decode(newReader); err != nil { + if err = dec.Decode(decodedReader); err != nil { return err } - f.knownFiles = append(f.knownFiles, newReader) + path := decodedReader.Path + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file while recovering checkpoints", zap.Error(err)) + f.tailingReaders[path] = []*Reader{decodedReader} + continue + } + + restoredReader, err := f.NewReader(path, file, decodedReader.Fingerprint) + if err != nil{ + f.Errorw("Failed to restore a Reader", zap.Error(err), "path", path) + continue + } + restoredReader.Offset = decodedReader.Offset + f.tailingReaders[path] = []*Reader{restoredReader} + } + + for _, readers := range f.tailingReaders { + f.Infof("Load Successful: %+v\n", readers) } return nil } + +func (f *InputOperator) closeFile(file *os.File, path string) { + err := file.Close() + if err != nil { + f.Warnw("Error closing a file", "file", path, "err", err) + } +} \ No newline at end of file diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 46583f2b..968062f3 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -40,7 +40,7 @@ import ( func newDefaultConfig(tempDir string) *InputConfig { cfg := NewInputConfig("testfile") - cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 100 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} cfg.OutputIDs = []string{"fake"} @@ -1170,7 +1170,7 @@ func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { select { case e := <-c: return e - case <-time.After(time.Second): + case <-time.After(time.Second*3): require.FailNow(t, "Timed out waiting for message") return nil } @@ -1182,7 +1182,7 @@ func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { select { case e := <-c: messages = append(messages, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(time.Second*3): require.FailNow(t, "Timed out waiting for message") return nil } @@ -1194,7 +1194,7 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { select { case e := <-c: require.Equal(t, expected, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(time.Second*3): require.FailNow(t, "Timed out waiting for message", expected) } } @@ -1206,7 +1206,7 @@ LOOP: select { case e := <-c: receivedMessages = append(receivedMessages, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(time.Second*5): break LOOP } } @@ -1303,7 +1303,7 @@ func TestEncodings(t *testing.T) { select { case entry := <-receivedEntries: require.Equal(t, expected, []byte(entry.Body.(string))) - case <-time.After(500 * time.Millisecond): + case <-time.After(time.Second*5): require.FailNow(t, "Timed out waiting for entry to be read") } } diff --git a/operator/builtin/input/file/fingerprint.go b/operator/builtin/input/file/fingerprint.go index 06eada2d..8620b4a9 100644 --- a/operator/builtin/input/file/fingerprint.go +++ b/operator/builtin/input/file/fingerprint.go @@ -58,7 +58,7 @@ func (f Fingerprint) Copy() *Fingerprint { // or if the new fingerprint starts with the old one func (f Fingerprint) StartsWith(old *Fingerprint) bool { l0 := len(old.FirstBytes) - if l0 == 0 { + if l0 <= 2 { return false } l1 := len(f.FirstBytes) diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 12dbf6bd..92cfb40f 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -77,6 +77,8 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error { return fmt.Errorf("stat: %s", err) } f.Offset = info.Size() + } else { + f.Offset = 0 } return nil @@ -84,10 +86,11 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error { // ReadToEnd will read until the end of the file func (f *Reader) ReadToEnd(ctx context.Context) { - defer f.file.Close() - + if f.fileInput == nil { + return + } if _, err := f.file.Seek(f.Offset, 0); err != nil { - f.Errorw("Failed to seek", zap.Error(err)) + f.Warnw("Failed to seek", zap.Error(err)) return }