diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 502486b54..f82428402 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -16,15 +16,17 @@ func init() { } const ( - defaultMaxLogSize = 1024 * 1024 - defaultMaxConcurrentFiles = 512 + defaultMaxLogSize = 1024 * 1024 + defaultMaxConcurrentFiles = 512 + defaultFilenameRecallPeriod = time.Minute + defaultPollInterval = 200 * time.Millisecond ) // NewInputConfig creates a new input config with default values func NewInputConfig(operatorID string) *InputConfig { return &InputConfig{ InputConfig: helper.NewInputConfig(operatorID, "file_input"), - PollInterval: helper.Duration{Duration: 200 * time.Millisecond}, + PollInterval: helper.Duration{Duration: defaultPollInterval}, IncludeFileName: true, IncludeFilePath: false, IncludeFileNameResolved: false, @@ -34,6 +36,7 @@ func NewInputConfig(operatorID string) *InputConfig { MaxLogSize: defaultMaxLogSize, MaxConcurrentFiles: defaultMaxConcurrentFiles, Encoding: helper.NewEncodingConfig(), + FilenameRecallPeriod: helper.Duration{Duration: defaultFilenameRecallPeriod}, } } @@ -55,6 +58,7 @@ type InputConfig struct { DeleteAfterRead bool `json:"delete_after_read,omitempty" yaml:"delete_after_read,omitempty"` LabelRegex string `json:"label_regex,omitempty" yaml:"label_regex,omitempty"` Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"` + FilenameRecallPeriod helper.Duration `json:"filename_recall_period,omitempty" yaml:"filename_recall_period,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -184,7 +188,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, fingerprintSize: int(c.FingerprintSize), MaxLogSize: int(c.MaxLogSize), MaxConcurrentFiles: c.MaxConcurrentFiles, - SeenPaths: make(map[string]struct{}, 100), + SeenPaths: make(map[string]time.Time, 100), + filenameRecallPeriod: c.FilenameRecallPeriod.Raw(), } return []operator.Operator{op}, nil diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 48b987b77..400ab90b2 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -29,7 +29,8 @@ type InputOperator struct { SplitFunc bufio.SplitFunc MaxLogSize int MaxConcurrentFiles int - SeenPaths map[string]struct{} + SeenPaths map[string]time.Time + filenameRecallPeriod time.Duration persist helper.Persister @@ -191,15 +192,23 @@ OUTER: // been read this polling interval func (f *InputOperator) makeReaders(ctx context.Context, filePaths []string) []*Reader { // Open the files first to minimize the time between listing and opening + now := time.Now() + cutoff := now.Add(f.filenameRecallPeriod * -1) + for filename, lastSeenTime := range f.SeenPaths { + if lastSeenTime.Before(cutoff) { + delete(f.SeenPaths, filename) + } + } + files := make([]*os.File, 0, len(filePaths)) for _, path := range filePaths { if _, ok := f.SeenPaths[path]; !ok { + f.SeenPaths[path] = now if f.startAtBeginning { f.Infow("Started watching file", "path", path) } else { f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) } - f.SeenPaths[path] = struct{}{} } file, err := os.Open(path) // #nosec - operator must read in files defined by user if err != nil { diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index d97451d73..9fb02e506 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -638,6 +638,53 @@ func TestDeleteAfterRead(t *testing.T) { } } +func TestFilenameRecallPeriod(t *testing.T) { + t.Parallel() + + operator, _, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { + cfg.FilenameRecallPeriod = helper.Duration{ + // start large, but this will be modified + Duration: time.Minute, + } + // so file only exist for first poll + cfg.DeleteAfterRead = true + }, nil) + + // Create some new files + temp1 := openTemp(t, tempDir) + writeString(t, temp1, stringWithLength(10)) + temp1.Close() + + temp2 := openTemp(t, tempDir) + writeString(t, temp2, stringWithLength(20)) + temp2.Close() + + require.Equal(t, 0, len(operator.SeenPaths)) + + // Poll once and validate that the files are remembered + operator.poll(context.Background()) + defer operator.Stop() + require.Equal(t, 2, len(operator.SeenPaths)) + require.Contains(t, operator.SeenPaths, temp1.Name()) + require.Contains(t, operator.SeenPaths, temp2.Name()) + + // Poll again to validate the files are still remembered + operator.poll(context.Background()) + require.Equal(t, 2, len(operator.SeenPaths)) + require.Contains(t, operator.SeenPaths, temp1.Name()) + require.Contains(t, operator.SeenPaths, temp2.Name()) + + time.Sleep(100 * time.Millisecond) + // Hijack the recall period to trigger a purge + operator.filenameRecallPeriod = time.Millisecond + + // Poll a final time + operator.poll(context.Background()) + + // SeenPaths has been purged of ancient files + require.Equal(t, 0, len(operator.SeenPaths)) +} + func TestFileReader_FingerprintUpdated(t *testing.T) { t.Parallel()