Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filename_recall_period parameter to file_input #440

Merged
merged 4 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ func init() {
}

const (
defaultMaxLogSize = 1024 * 1024
defaultMaxConcurrentFiles = 512
defaultMaxLogSize = 1024 * 1024
defaultMaxConcurrentFiles = 512
defaultFilenameRecallPeriod = time.Minute
defaultPollInterval = 200 * time.Millisecond
)

// NewInputConfig creates a new input config with default values
func NewInputConfig(operatorID string) *InputConfig {
return &InputConfig{
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
PollInterval: helper.Duration{Duration: defaultPollInterval},
IncludeFileName: true,
IncludeFilePath: false,
IncludeFileNameResolved: false,
Expand All @@ -34,6 +36,7 @@ func NewInputConfig(operatorID string) *InputConfig {
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
FilenameRecallPeriod: helper.Duration{Duration: defaultFilenameRecallPeriod},
}
}

Expand All @@ -55,6 +58,7 @@ type InputConfig struct {
DeleteAfterRead bool `json:"delete_after_read,omitempty" yaml:"delete_after_read,omitempty"`
LabelRegex string `json:"label_regex,omitempty" yaml:"label_regex,omitempty"`
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
FilenameRecallPeriod helper.Duration `json:"filename_recall_period,omitempty" yaml:"filename_recall_period,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -184,7 +188,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
SeenPaths: make(map[string]time.Time, 100),
filenameRecallPeriod: c.FilenameRecallPeriod.Raw(),
}

return []operator.Operator{op}, nil
Expand Down
13 changes: 11 additions & 2 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type InputOperator struct {
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
SeenPaths map[string]time.Time
filenameRecallPeriod time.Duration

persist helper.Persister

Expand Down Expand Up @@ -191,15 +192,23 @@ OUTER:
// been read this polling interval
func (f *InputOperator) makeReaders(ctx context.Context, filePaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
now := time.Now()
cutoff := now.Add(f.filenameRecallPeriod * -1)
for filename, lastSeenTime := range f.SeenPaths {
if lastSeenTime.Before(cutoff) {
delete(f.SeenPaths, filename)
}
}

files := make([]*os.File, 0, len(filePaths))
for _, path := range filePaths {
if _, ok := f.SeenPaths[path]; !ok {
f.SeenPaths[path] = now
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path) // #nosec - operator must read in files defined by user
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,53 @@ func TestDeleteAfterRead(t *testing.T) {
}
}

func TestFilenameRecallPeriod(t *testing.T) {
t.Parallel()

operator, _, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.FilenameRecallPeriod = helper.Duration{
// start large, but this will be modified
Duration: time.Minute,
}
// so file only exist for first poll
cfg.DeleteAfterRead = true
}, nil)

// Create some new files
temp1 := openTemp(t, tempDir)
writeString(t, temp1, stringWithLength(10))
temp1.Close()

temp2 := openTemp(t, tempDir)
writeString(t, temp2, stringWithLength(20))
temp2.Close()

require.Equal(t, 0, len(operator.SeenPaths))

// Poll once and validate that the files are remembered
operator.poll(context.Background())
defer operator.Stop()
require.Equal(t, 2, len(operator.SeenPaths))
require.Contains(t, operator.SeenPaths, temp1.Name())
require.Contains(t, operator.SeenPaths, temp2.Name())

// Poll again to validate the files are still remembered
operator.poll(context.Background())
require.Equal(t, 2, len(operator.SeenPaths))
require.Contains(t, operator.SeenPaths, temp1.Name())
require.Contains(t, operator.SeenPaths, temp2.Name())

time.Sleep(100 * time.Millisecond)
// Hijack the recall period to trigger a purge
operator.filenameRecallPeriod = time.Millisecond

// Poll a final time
operator.poll(context.Background())

// SeenPaths has been purged of ancient files
require.Equal(t, 0, len(operator.SeenPaths))
}

func TestFileReader_FingerprintUpdated(t *testing.T) {
t.Parallel()

Expand Down