diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index e82719f698ff..dbdde6690460 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" @@ -171,8 +172,9 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli pollInterval: c.PollInterval, maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, - previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2), - knownFiles: []*reader.Metadata{}, + activeFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2), + previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2), + knownFiles: fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2), }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 085389685017..3175579962c7 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -13,6 +13,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" @@ -32,8 +33,9 @@ type Manager struct { maxBatches int maxBatchFiles int - previousPollFiles []*reader.Reader - knownFiles []*reader.Metadata + activeFiles *fileset.Fileset[*reader.Reader] + previousPollFiles *fileset.Fileset[*reader.Reader] + knownFiles *fileset.Fileset[*reader.Metadata] // This value approximates the expected number of files which we will find in a single poll cycle. // It is updated each poll cycle using a simple moving average calculation which assigns 20% weight @@ -51,7 +53,7 @@ func (m *Manager) Start(persister operator.Persister) error { m.Warnf("finding files: %v", err) } else { m.movingAverageMatches = len(matches) - m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches)) + m.knownFiles = fileset.New[*reader.Metadata](4 * len(matches)) } if persister != nil { @@ -63,7 +65,7 @@ func (m *Manager) Start(persister operator.Persister) error { if len(offsets) > 0 { m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") m.readerFactory.FromBeginning = true - m.knownFiles = append(m.knownFiles, offsets...) + m.knownFiles.Add(offsets...) } } @@ -74,13 +76,14 @@ func (m *Manager) Start(persister operator.Persister) error { } func (m *Manager) closePreviousFiles() { - if len(m.knownFiles) > 4*m.movingAverageMatches { - m.knownFiles = m.knownFiles[m.movingAverageMatches:] + if m.knownFiles.Len() > 4*m.movingAverageMatches { + if _, err := m.knownFiles.PopN(m.movingAverageMatches); err != nil { + m.Errorw("Failed to remove closed files", zap.Error(err)) + } } - for _, r := range m.previousPollFiles { - m.knownFiles = append(m.knownFiles, r.Close()) + for _, r := range m.previousPollFiles.Reset() { + m.knownFiles.Add(r.Close()) } - m.previousPollFiles = nil } // Stop will stop the file monitoring process @@ -92,7 +95,7 @@ func (m *Manager) Stop() error { m.wg.Wait() m.closePreviousFiles() if m.persister != nil { - if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil { + if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles.Get()); err != nil { m.Errorw("save offsets", zap.Error(err)) } } @@ -152,11 +155,11 @@ func (m *Manager) poll(ctx context.Context) { // Any new files that appear should be consumed entirely m.readerFactory.FromBeginning = true if m.persister != nil { - allCheckpoints := make([]*reader.Metadata, 0, len(m.knownFiles)+len(m.previousPollFiles)) - allCheckpoints = append(allCheckpoints, m.knownFiles...) - for _, r := range m.previousPollFiles { + allCheckpoints := make([]*reader.Metadata, 0, m.knownFiles.Len()+m.previousPollFiles.Len()) + for _, r := range m.previousPollFiles.Get() { allCheckpoints = append(allCheckpoints, r.Metadata) } + allCheckpoints = append(allCheckpoints, m.knownFiles.Get()...) if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil { m.Errorw("save offsets", zap.Error(err)) } @@ -165,13 +168,13 @@ func (m *Manager) poll(ctx context.Context) { func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files", zap.Strings("paths", paths)) - readers := m.makeReaders(paths) + m.makeReaders(paths) - m.preConsume(ctx, readers) + m.preConsume(ctx) // read new readers to end var wg sync.WaitGroup - for _, r := range readers { + for _, r := range m.activeFiles.Get() { wg.Add(1) go func(r *reader.Reader) { defer wg.Done() @@ -180,7 +183,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - m.postConsume(readers) + m.postConsume() } func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { @@ -211,8 +214,8 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi // makeReader take a file path, then creates reader, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (m *Manager) makeReaders(paths []string) []*reader.Reader { - readers := make([]*reader.Reader, 0, len(paths)) +func (m *Manager) makeReaders(paths []string) { + m.activeFiles.Clear() OUTER: for _, path := range paths { fp, file := m.makeFingerprint(path) @@ -222,7 +225,7 @@ OUTER: // Exclude duplicate paths with the same content. This can happen when files are // being rotated with copy/truncate strategy. (After copy, prior to truncate.) - for _, r := range readers { + for _, r := range m.activeFiles.Get() { if fp.Equal(r.Fingerprint) { if err := file.Close(); err != nil { m.Debugw("problem closing file", zap.Error(err)) @@ -237,31 +240,19 @@ OUTER: continue } - readers = append(readers, r) + m.activeFiles.Add(r) } - return readers } func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { // Check previous poll cycle for match - for i := 0; i < len(m.previousPollFiles); i++ { - oldReader := m.previousPollFiles[i] - if fp.StartsWith(oldReader.Fingerprint) { - // Keep the new reader and discard the old. This ensures that if the file was - // copied to another location and truncated, our handle is updated. - m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...) - return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) - } + if oldReader := m.previousPollFiles.Match(fp); oldReader != nil { + return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) } // Iterate backwards to match newest first - for i := len(m.knownFiles) - 1; i >= 0; i-- { - oldMetadata := m.knownFiles[i] - if fp.StartsWith(oldMetadata.Fingerprint) { - // Remove the old metadata from the list. We will keep updating it and save it again later. - m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...) - return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) - } + if oldMetadata := m.knownFiles.Match(fp); oldMetadata != nil { + return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) } // If we don't match any previously known files, create a new reader from scratch diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index 6960982801e2..ce3592beacb9 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 //go:build !windows +// +build !windows package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" @@ -15,11 +16,11 @@ import ( // Take care of files which disappeared from the pattern since the last poll cycle // this can mean either files which were removed, or rotated into a name not matching the pattern // we do this before reading existing files to ensure we emit older log lines before newer ones -func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) { - lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles)) +func (m *Manager) preConsume(ctx context.Context) { + lostReaders := make([]*reader.Reader, 0, len(previousPollFiles)) OUTER: - for _, oldReader := range m.previousPollFiles { - for _, newReader := range newReaders { + for _, oldReader := range m.previousPollFiles.Get() { + for _, newReader := range m.activeFiles.Get() { if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) { continue OUTER } @@ -52,7 +53,7 @@ OUTER: // On non-windows platforms, we keep files open between poll cycles so that we can detect // and read "lost" files, which have been moved out of the matching pattern. -func (m *Manager) postConsume(readers []*reader.Reader) { +func (m *Manager) postConsume() { m.closePreviousFiles() - m.previousPollFiles = readers + m.previousPollFiles.Add(m.activeFiles.Get()...) } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 2e203ef1ca4d..91c12a9745c7 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1144,7 +1144,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { require.NoError(t, longFile.Close()) // Verify we have no checkpointed files - require.Equal(t, 0, len(operator.knownFiles)) + require.Equal(t, 0, operator.knownFiles.Len()) // Wait until the only line in the short file and // at least one line from the long file have been consumed @@ -1286,7 +1286,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { operator.wg.Wait() if runtime.GOOS != "windows" { // On windows, we never keep files in previousPollFiles, so we don't expect to see them here - require.Len(t, operator.previousPollFiles, 1) + require.Equal(t, operator.previousPollFiles.Len(), 1) } // keep append data to file1 and file2 diff --git a/pkg/stanza/fileconsumer/file_windows.go b/pkg/stanza/fileconsumer/file_windows.go index 2c22b5ed4401..fa8fb786bea7 100644 --- a/pkg/stanza/fileconsumer/file_windows.go +++ b/pkg/stanza/fileconsumer/file_windows.go @@ -17,5 +17,6 @@ func (m *Manager) preConsume(_ context.Context, _ []*reader.Reader) { // On windows, we close files immediately after reading because they cannot be moved while open. func (m *Manager) postConsume(readers []*reader.Reader) { m.previousPollFiles = readers + m.openFiles.Reset(m.ActiveFiles()...) m.closePreviousFiles() } diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset.go b/pkg/stanza/fileconsumer/internal/fileset/fileset.go index ead43a717ce6..c75814465525 100644 --- a/pkg/stanza/fileconsumer/internal/fileset/fileset.go +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset.go @@ -39,15 +39,18 @@ func (set *Fileset[T]) Get() []T { return set.readers } -func (set *Fileset[T]) Pop() (T, error) { - // return first element from the array and remove it - var val T +func (set *Fileset[T]) PopN(n int) ([]T, error) { + // remove top n elements and return them + if n <= 0 { + return nil, errors.New("n should be positive") + } if len(set.readers) == 0 { - return val, errFilesetEmpty + return nil, errFilesetEmpty } - r := set.readers[0] - set.readers = slices.Delete(set.readers, 0, 1) - return r, nil + arr := make([]T, n) + copy(arr, set.readers[:n]) + set.readers = slices.Delete(set.readers, 0, n) + return arr, nil } func (set *Fileset[T]) Add(readers ...T) { diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go b/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go index 9e6d9269a4cb..b865c3570352 100644 --- a/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go @@ -30,12 +30,15 @@ func push[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) { } } -func pop[T Matchable](expectedErr error, expectedElemet T) func(t *testing.T, fileset *Fileset[T]) { +func pop[T Matchable](n int, expectedErr error, expectedElemets ...T) func(t *testing.T, fileset *Fileset[T]) { return func(t *testing.T, fileset *Fileset[T]) { - el, err := fileset.Pop() + pr := fileset.Len() + el, err := fileset.PopN(n) if expectedErr == nil { require.NoError(t, err) - require.Equal(t, el, expectedElemet) + require.ElementsMatch(t, el, expectedElemets) + require.Equal(t, len(el), n) + require.Equal(t, pr-n, fileset.Len()) } else { require.ErrorIs(t, err, expectedErr) } @@ -102,13 +105,12 @@ func TestFilesetReader(t *testing.T) { name: "test_pop", ops: []func(t *testing.T, fileset *Fileset[*reader.Reader]){ push(newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))), - pop(nil, newReader([]byte("ABCDEF"))), - pop(nil, newReader([]byte("QWERT"))), - pop(errFilesetEmpty, newReader([]byte(""))), + pop(2, nil, newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))), + pop(1, errFilesetEmpty, newReader([]byte(""))), reset(newReader([]byte("XYZ"))), - pop(nil, newReader([]byte("XYZ"))), - pop(errFilesetEmpty, newReader([]byte(""))), + pop(1, nil, newReader([]byte("XYZ"))), + pop(1, errFilesetEmpty, newReader([]byte(""))), }, }, }