From 6184b76f45321cadf442dd0c00ece1a13c71b224 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 25 May 2021 12:36:10 -0400 Subject: [PATCH] File Input Cleanup - Step 1 (#155) The codebase within the `file_input` operator is complex enough that functional changes are difficult to confidently implement. This PR is minor step towards untangling the code base. - Open files in the `makeReaders` method - Add `Reader.Close` - Remove unnecessary copying of file handles --- operator/builtin/input/file/file.go | 58 +++++++++++++-------------- operator/builtin/input/file/reader.go | 5 +++ 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index ad1fea14..fcfb2553 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -136,26 +136,7 @@ func (f *InputOperator) poll(ctx context.Context) { } } - // Open the files first to minimize the time between listing and opening - files := make([]*os.File, 0, len(matches)) - for _, path := range matches { - if _, ok := f.SeenPaths[path]; !ok { - if f.startAtBeginning { - f.Infow("Started watching file", "path", path) - } else { - f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) - } - f.SeenPaths[path] = struct{}{} - } - file, err := os.Open(path) - if err != nil { - f.Errorw("Failed to open file", zap.Error(err)) - continue - } - files = append(files, file) - } - - readers := f.makeReaders(files) + readers := f.makeReaders(matches) f.firstCheck = false var wg sync.WaitGroup @@ -171,8 +152,8 @@ func (f *InputOperator) poll(ctx context.Context) { wg.Wait() // Close all files - for _, file := range files { - file.Close() + for _, reader := range readers { + reader.Close() } f.saveCurrent(readers) @@ -208,7 +189,26 @@ func getMatches(includes, excludes []string) []string { // makeReaders takes a list of paths, then creates readers from each of those paths, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (f *InputOperator) makeReaders(files []*os.File) []*Reader { +func (f *InputOperator) makeReaders(filesPaths []string) []*Reader { + // Open the files first to minimize the time between listing and opening + files := make([]*os.File, 0, len(filesPaths)) + for _, path := range filesPaths { + if _, ok := f.SeenPaths[path]; !ok { + if f.startAtBeginning { + f.Infow("Started watching file", "path", path) + } else { + f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) + } + f.SeenPaths[path] = struct{}{} + } + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file", zap.Error(err)) + continue + } + files = append(files, file) + } + // Get fingerprints for each file fps := make([]*Fingerprint, 0, len(files)) for _, file := range files { @@ -220,18 +220,15 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader { fps = append(fps, fp) } - // Make a copy of the files so we don't modify the original - filesCopy := make([]*os.File, len(files)) - copy(filesCopy, files) - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files OUTER: for i := 0; i < len(fps); { fp := fps[i] if len(fp.FirstBytes) == 0 { + files[i].Close() // Empty file, don't read it until we can compare its fingerprint fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) + files = append(files[:i], files[i+1:]...) } for j := 0; j < len(fps); j++ { @@ -243,8 +240,9 @@ OUTER: fp2 := fps[j] if fp.StartsWith(fp2) || fp2.StartsWith(fp) { // Exclude + files[i].Close() fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) + files = append(files[:i], files[i+1:]...) continue OUTER } } @@ -253,7 +251,7 @@ OUTER: readers := make([]*Reader, 0, len(fps)) for i := 0; i < len(fps); i++ { - reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck) + reader, err := f.newReader(files[i], fps[i], f.firstCheck) if err != nil { f.Errorw("Failed to create reader", zap.Error(err)) continue diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 12dbf6bd..150319cd 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -117,6 +117,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) { } } +// Close will close the file +func (f *Reader) Close() error { + return f.file.Close() +} + // Emit creates an entry with the decoded message and sends it to the next // operator in the pipeline func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {