diff --git a/.chloggen/add_filelog_logging.yaml b/.chloggen/add_filelog_logging.yaml new file mode 100644 index 000000000000..d643edf312e5 --- /dev/null +++ b/.chloggen/add_filelog_logging.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Log when files are rotated/moved/truncated + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33237] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/fileconsumer/design.md b/pkg/stanza/fileconsumer/design.md index 34d7fee70063..23100d22cd21 100644 --- a/pkg/stanza/fileconsumer/design.md +++ b/pkg/stanza/fileconsumer/design.md @@ -206,6 +206,40 @@ When the operator shuts down, the following occurs: The net effect of the shut down routine is that all files are checkpointed in a normal manner (i.e. not in the middle of a log entry), and all checkpoints are persisted. +### Log rotation + +#### Supported cases + +A) When a file is moved within the pattern with unread logs on the end, then the original is created again, + we get the unread logs on the moved as well as any new logs written to the newly created file. + +B) When a file is copied within the pattern with unread logs on the end, then the original is truncated, + we get the unread logs on the copy as well as any new logs written to the truncated file. + +C) When a file it rotated out of pattern via move/create, we detect that + our old handle is still valid and we attempt to read from it. + +D) When a file it rotated out of pattern via copy/truncate, we detect that + our old handle is invalid and we do not attempt to read from it. + + +#### Rotated files that end up within the matching pattern + +In both cases of copy/truncate and move/create, if the rotated files match the pattern +then the old readers that point to the original path will be closed and we will create new +ones which will be pointing to the rotated file but using the existing metadata's offset. +The receiver will continue consuming the rotated paths in any case so there will be +no data loss during the transition. +The original files will have a fresh fingerprint so they will be consumed by a completely +new reader. + +#### Rotated files that end up out of the matching pattern + +In case of a file has been rotated with move/create, the old handle will be pointing +to the moved file so we can still consume from it even if it's out of the pattern. +In case of the file has been rotated with copy/truncate, the old handle will be pointing +to the original file which is truncated. So we don't have a handle in order to consume any remaining +logs from the moved file. This can cause data loss. # Known Limitations diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index acb0f47629b1..517526ebe503 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -208,6 +208,7 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) { // Exclude duplicate paths with the same content. This can happen when files are // being rotated with copy/truncate strategy. (After copy, prior to truncate.) if r := m.tracker.GetCurrentFile(fp); r != nil { + m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name())) // re-add the reader as Match() removes duplicates m.tracker.Add(r) if err := file.Close(); err != nil { @@ -229,6 +230,19 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) { func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { // Check previous poll cycle for match if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil { + if oldReader.GetFileName() != file.Name() { + if !oldReader.Validate() { + m.set.Logger.Debug( + "File has been rotated(truncated)", + zap.String("original_path", oldReader.GetFileName()), + zap.String("rotated_path", file.Name())) + } else { + m.set.Logger.Debug( + "File has been rotated(moved)", + zap.String("original_path", oldReader.GetFileName()), + zap.String("rotated_path", file.Name())) + } + } return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) } diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index ac45d395faa6..5acc955e8448 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -9,6 +9,8 @@ import ( "context" "sync" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) @@ -34,13 +36,17 @@ OUTER: continue } - // At this point, we know that the file has been rotated. However, we do not know - // if it was moved or truncated. If truncated, then both handles point to the same - // file, in which case we should only read from it using the new reader. We can use + // At this point, we know that the file has been rotated out of the matching pattern. + // However, we do not know if it was moved or truncated. + // If truncated, then both handles point to the same file, in which case + // we should only read from it using the new reader. We can use // the Validate method to ensure that the file has not been truncated. if !oldReader.Validate() { + m.set.Logger.Debug("File has been rotated(truncated)", zap.String("path", oldReader.GetFileName())) continue OUTER } + // oldreader points to the rotated file after the move/rename. We can still read from it. + m.set.Logger.Debug("File has been rotated(moved)", zap.String("path", oldReader.GetFileName())) } lostReaders = append(lostReaders, oldReader) } @@ -48,6 +54,7 @@ OUTER: var lostWG sync.WaitGroup for _, lostReader := range lostReaders { lostWG.Add(1) + m.set.Logger.Debug("Reading lost file", zap.String("path", lostReader.GetFileName())) go func(r *reader.Reader) { defer lostWG.Done() m.readingFiles.Add(ctx, 1) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 9fa22b0e2fff..372f42e48d3d 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -184,6 +184,10 @@ func (r *Reader) Validate() bool { return false } +func (r *Reader) GetFileName() string { + return r.fileName +} + func (m Metadata) GetFingerprint() *fingerprint.Fingerprint { return m.Fingerprint } diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 008bb01bb71f..2c8912cc6eb7 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -14,7 +14,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -215,6 +218,9 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) + core, observedLogs := observer.New(zap.DebugLevel) + logger := zap.New(core) + operator.set.Logger = logger originalFile := filetest.OpenTemp(t, tempDir) orginalName := originalFile.Name() @@ -240,6 +246,16 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) { filetest.WriteString(t, newFile, "testlog3\n") sink.ExpectTokens(t, []byte("testlog2"), []byte("testlog3")) + + // verify that proper logging has taken place + allLogs := observedLogs.All() + foundLog := false + for _, actualLog := range allLogs { + if actualLog.Message == "File has been rotated(moved)" { + foundLog = true + } + } + assert.True(t, foundLog) } // When a file it rotated out of pattern via move/create, we should @@ -256,6 +272,9 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() + core, observedLogs := observer.New(zap.DebugLevel) + logger := zap.New(core) + operator.set.Logger = logger originalFile := filetest.OpenTempWithPattern(t, tempDir, "*.log1") originalFileName := originalFile.Name() @@ -280,6 +299,20 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) { // expect remaining log from old file as well as all from new file sink.ExpectTokens(t, []byte("testlog2"), []byte("testlog4"), []byte("testlog5")) + + // verify that proper logging has taken place + allLogs := observedLogs.All() + expectedLogs := map[string]string{ + "File has been rotated(moved)": "", + "Reading lost file": "", + } + foundLogs := 0 + for _, actualLog := range allLogs { + if _, ok := expectedLogs[actualLog.Message]; ok { + foundLogs++ + } + } + assert.Equal(t, 2, foundLogs) } // When a file it rotated out of pattern via copy/truncate, we should @@ -293,6 +326,9 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() + core, observedLogs := observer.New(zap.DebugLevel) + logger := zap.New(core) + operator.set.Logger = logger originalFile := filetest.OpenTempWithPattern(t, tempDir, "*.log1") filetest.WriteString(t, originalFile, "testlog1\n") @@ -318,6 +354,16 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) { operator.poll(context.Background()) sink.ExpectTokens(t, []byte("testlog4"), []byte("testlog5")) + + // verify that proper logging has taken place + allLogs := observedLogs.All() + foundLog := false + for _, actualLog := range allLogs { + if actualLog.Message == "File has been rotated(truncated)" { + foundLog = true + } + } + assert.True(t, foundLog) } // TruncateThenWrite tests that, after a file has been truncated, @@ -333,6 +379,9 @@ func TestTruncateThenWrite(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() + core, observedLogs := observer.New(zap.DebugLevel) + logger := zap.New(core) + operator.set.Logger = logger temp1 := filetest.OpenTemp(t, tempDir) filetest.WriteString(t, temp1, "testlog1\ntestlog2\n") @@ -348,6 +397,16 @@ func TestTruncateThenWrite(t *testing.T) { operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog3")) sink.ExpectNoCalls(t) + + // verify that proper logging has taken place + allLogs := observedLogs.All() + foundLog := false + for _, actualLog := range allLogs { + if actualLog.Message == "File has been rotated(truncated)" { + foundLog = true + } + } + assert.True(t, foundLog) } // CopyTruncateWriteBoth tests that when a file is copied