Skip to content

Commit

Permalink
chore: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Jan 23, 2024
1 parent bd1d706 commit dc2aa54
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 62 deletions.
6 changes: 4 additions & 2 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
Expand Down Expand Up @@ -171,8 +172,9 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: []*reader.Metadata{},
activeFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
knownFiles: fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2),
}, nil
}

Expand Down
65 changes: 28 additions & 37 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
Expand All @@ -32,8 +33,9 @@ type Manager struct {
maxBatches int
maxBatchFiles int

previousPollFiles []*reader.Reader
knownFiles []*reader.Metadata
activeFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles *fileset.Fileset[*reader.Metadata]

// This value approximates the expected number of files which we will find in a single poll cycle.
// It is updated each poll cycle using a simple moving average calculation which assigns 20% weight
Expand All @@ -51,7 +53,7 @@ func (m *Manager) Start(persister operator.Persister) error {
m.Warnf("finding files: %v", err)
} else {
m.movingAverageMatches = len(matches)
m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches))
m.knownFiles = fileset.New[*reader.Metadata](4 * len(matches))
}

if persister != nil {
Expand All @@ -63,7 +65,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
m.knownFiles = append(m.knownFiles, offsets...)
m.knownFiles.Add(offsets...)
}
}

Expand All @@ -74,13 +76,14 @@ func (m *Manager) Start(persister operator.Persister) error {
}

func (m *Manager) closePreviousFiles() {
if len(m.knownFiles) > 4*m.movingAverageMatches {
m.knownFiles = m.knownFiles[m.movingAverageMatches:]
if m.knownFiles.Len() > 4*m.movingAverageMatches {
if _, err := m.knownFiles.PopN(m.movingAverageMatches); err != nil {
m.Errorw("Failed to remove closed files", zap.Error(err))
}
}
for _, r := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, r.Close())
for _, r := range m.previousPollFiles.Reset() {
m.knownFiles.Add(r.Close())
}
m.previousPollFiles = nil
}

// Stop will stop the file monitoring process
Expand All @@ -92,7 +95,7 @@ func (m *Manager) Stop() error {
m.wg.Wait()
m.closePreviousFiles()
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles.Get()); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
Expand Down Expand Up @@ -152,11 +155,11 @@ func (m *Manager) poll(ctx context.Context) {
// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
allCheckpoints := make([]*reader.Metadata, 0, len(m.knownFiles)+len(m.previousPollFiles))
allCheckpoints = append(allCheckpoints, m.knownFiles...)
for _, r := range m.previousPollFiles {
allCheckpoints := make([]*reader.Metadata, 0, m.knownFiles.Len()+m.previousPollFiles.Len())
for _, r := range m.previousPollFiles.Get() {
allCheckpoints = append(allCheckpoints, r.Metadata)
}
allCheckpoints = append(allCheckpoints, m.knownFiles.Get()...)
if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
Expand All @@ -165,13 +168,13 @@ func (m *Manager) poll(ctx context.Context) {

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files", zap.Strings("paths", paths))
readers := m.makeReaders(paths)
m.makeReaders(paths)

m.preConsume(ctx, readers)
m.preConsume(ctx)

// read new readers to end
var wg sync.WaitGroup
for _, r := range readers {
for _, r := range m.activeFiles.Get() {
wg.Add(1)
go func(r *reader.Reader) {
defer wg.Done()
Expand All @@ -180,7 +183,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.postConsume(readers)
m.postConsume()
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -211,8 +214,8 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(paths []string) []*reader.Reader {
readers := make([]*reader.Reader, 0, len(paths))
func (m *Manager) makeReaders(paths []string) {
m.activeFiles.Clear()
OUTER:
for _, path := range paths {
fp, file := m.makeFingerprint(path)
Expand All @@ -222,7 +225,7 @@ OUTER:

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
for _, r := range readers {
for _, r := range m.activeFiles.Get() {
if fp.Equal(r.Fingerprint) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
Expand All @@ -237,31 +240,19 @@ OUTER:
continue
}

readers = append(readers, r)
m.activeFiles.Add(r)
}
return readers
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
for i := 0; i < len(m.previousPollFiles); i++ {
oldReader := m.previousPollFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Keep the new reader and discard the old. This ensures that if the file was
// copied to another location and truncated, our handle is updated.
m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...)
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}
if oldReader := m.previousPollFiles.Match(fp); oldReader != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldMetadata := m.knownFiles[i]
if fp.StartsWith(oldMetadata.Fingerprint) {
// Remove the old metadata from the list. We will keep updating it and save it again later.
m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...)
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
if oldMetadata := m.knownFiles.Match(fp); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}

// If we don't match any previously known files, create a new reader from scratch
Expand Down
13 changes: 7 additions & 6 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

Expand All @@ -15,11 +16,11 @@ import (
// Take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) {
lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles))
func (m *Manager) preConsume(ctx context.Context) {
lostReaders := make([]*reader.Reader, 0, len(previousPollFiles))

Check failure on line 20 in pkg/stanza/fileconsumer/file_other.go

View workflow job for this annotation

GitHub Actions / govulncheck (pkg)

undefined: previousPollFiles
OUTER:
for _, oldReader := range m.previousPollFiles {
for _, newReader := range newReaders {
for _, oldReader := range m.previousPollFiles.Get() {
for _, newReader := range m.activeFiles.Get() {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
Expand Down Expand Up @@ -52,7 +53,7 @@ OUTER:

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (m *Manager) postConsume(readers []*reader.Reader) {
func (m *Manager) postConsume() {
m.closePreviousFiles()
m.previousPollFiles = readers
m.previousPollFiles.Add(m.activeFiles.Get()...)
}
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
require.NoError(t, longFile.Close())

// Verify we have no checkpointed files
require.Equal(t, 0, len(operator.knownFiles))
require.Equal(t, 0, operator.knownFiles.Len())

// Wait until the only line in the short file and
// at least one line from the long file have been consumed
Expand Down Expand Up @@ -1286,7 +1286,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
operator.wg.Wait()
if runtime.GOOS != "windows" {
// On windows, we never keep files in previousPollFiles, so we don't expect to see them here
require.Len(t, operator.previousPollFiles, 1)
require.Equal(t, operator.previousPollFiles.Len(), 1)
}

// keep append data to file1 and file2
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/fileconsumer/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ func (m *Manager) preConsume(_ context.Context, _ []*reader.Reader) {
// On windows, we close files immediately after reading because they cannot be moved while open.
func (m *Manager) postConsume(readers []*reader.Reader) {
m.previousPollFiles = readers
m.openFiles.Reset(m.ActiveFiles()...)
m.closePreviousFiles()
}
17 changes: 10 additions & 7 deletions pkg/stanza/fileconsumer/internal/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ func (set *Fileset[T]) Get() []T {
return set.readers
}

func (set *Fileset[T]) Pop() (T, error) {
// return first element from the array and remove it
var val T
func (set *Fileset[T]) PopN(n int) ([]T, error) {
// remove top n elements and return them
if n <= 0 {
return nil, errors.New("n should be positive")
}
if len(set.readers) == 0 {
return val, errFilesetEmpty
return nil, errFilesetEmpty
}
r := set.readers[0]
set.readers = slices.Delete(set.readers, 0, 1)
return r, nil
arr := make([]T, n)
copy(arr, set.readers[:n])
set.readers = slices.Delete(set.readers, 0, n)
return arr, nil
}

func (set *Fileset[T]) Add(readers ...T) {
Expand Down
18 changes: 10 additions & 8 deletions pkg/stanza/fileconsumer/internal/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ func push[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) {
}
}

func pop[T Matchable](expectedErr error, expectedElemet T) func(t *testing.T, fileset *Fileset[T]) {
func pop[T Matchable](n int, expectedErr error, expectedElemets ...T) func(t *testing.T, fileset *Fileset[T]) {
return func(t *testing.T, fileset *Fileset[T]) {
el, err := fileset.Pop()
pr := fileset.Len()
el, err := fileset.PopN(n)
if expectedErr == nil {
require.NoError(t, err)
require.Equal(t, el, expectedElemet)
require.ElementsMatch(t, el, expectedElemets)
require.Equal(t, len(el), n)
require.Equal(t, pr-n, fileset.Len())
} else {
require.ErrorIs(t, err, expectedErr)
}
Expand Down Expand Up @@ -102,13 +105,12 @@ func TestFilesetReader(t *testing.T) {
name: "test_pop",
ops: []func(t *testing.T, fileset *Fileset[*reader.Reader]){
push(newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))),
pop(nil, newReader([]byte("ABCDEF"))),
pop(nil, newReader([]byte("QWERT"))),
pop(errFilesetEmpty, newReader([]byte(""))),
pop(2, nil, newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))),
pop(1, errFilesetEmpty, newReader([]byte(""))),

reset(newReader([]byte("XYZ"))),
pop(nil, newReader([]byte("XYZ"))),
pop(errFilesetEmpty, newReader([]byte(""))),
pop(1, nil, newReader([]byte("XYZ"))),
pop(1, errFilesetEmpty, newReader([]byte(""))),
},
},
}
Expand Down

0 comments on commit dc2aa54

Please sign in to comment.