Skip to content

Commit

Permalink
fix: refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Jan 23, 2024
1 parent dc2aa54 commit 6f94284
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
18 changes: 8 additions & 10 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,20 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// been read this polling interval
func (m *Manager) makeReaders(paths []string) {
m.activeFiles.Clear()
OUTER:
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
for _, r := range m.activeFiles.Get() {
if fp.Equal(r.Fingerprint) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
continue OUTER
if r := m.activeFiles.Match(fp, fileset.Equal); r != nil {
// re-add the reader as Match() removes duplicates
m.activeFiles.Add(r)
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
continue
}

r, err := m.newReader(file, fp)
Expand All @@ -246,12 +244,12 @@ OUTER:

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.previousPollFiles.Match(fp); oldReader != nil {
if oldReader := m.previousPollFiles.Match(fp, fileset.StartsWith); oldReader != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Iterate backwards to match newest first
if oldMetadata := m.knownFiles.Match(fp); oldMetadata != nil {
if oldMetadata := m.knownFiles.Match(fp, fileset.StartsWith); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) preConsume(ctx context.Context) {
lostReaders := make([]*reader.Reader, 0, len(previousPollFiles))
lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len())
OUTER:
for _, oldReader := range m.previousPollFiles.Get() {
for _, newReader := range m.activeFiles.Get() {
Expand Down
13 changes: 11 additions & 2 deletions pkg/stanza/fileconsumer/internal/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,22 @@ func (set *Fileset[T]) Reset(readers ...T) []T {
return arr
}

func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint) T {
func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint, cmp func(a, b *fingerprint.Fingerprint) bool) T {
var val T
for idx, r := range set.readers {
if fp.StartsWith(r.GetFingerprint()) {
if cmp(fp, r.GetFingerprint()) {
set.readers = append(set.readers[:idx], set.readers[idx+1:]...)
return r
}
}
return val
}

// comparators
func StartsWith(a, b *fingerprint.Fingerprint) bool {
return a.StartsWith(b)
}

func Equal(a, b *fingerprint.Fingerprint) bool {
return a.Equal(b)
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func reset[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) {
func match[T Matchable](ele T, expect bool) func(t *testing.T, fileset *Fileset[T]) {
return func(t *testing.T, fileset *Fileset[T]) {
pr := fileset.Len()
r := fileset.Match(ele.GetFingerprint())
r := fileset.Match(ele.GetFingerprint(), StartsWith)
if expect {
require.NotNil(t, r)
require.Equal(t, pr-1, fileset.Len())
Expand Down

0 comments on commit 6f94284

Please sign in to comment.