diff --git a/.chloggen/pkg-stanza-fileconsumer-darfix.yaml b/.chloggen/pkg-stanza-fileconsumer-darfix.yaml new file mode 100755 index 000000000000..e76ce6eedd85 --- /dev/null +++ b/.chloggen/pkg-stanza-fileconsumer-darfix.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug where delete_after_read would cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31383] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index edc4f89cb004..4f3267d749b8 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -67,7 +67,6 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) closePreviousFiles() { // m.previousPollFiles -> m.knownFiles[0] - for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() { m.knownFiles[0].Add(r.Close()) } @@ -171,7 +170,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files", zap.Strings("paths", paths)) m.makeReaders(paths) - m.preConsume(ctx) + m.readLostFiles(ctx) // read new readers to end var wg sync.WaitGroup diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index cfa0c80fae09..daef0f312484 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -15,7 +15,12 @@ import ( // Take care of files which disappeared from the pattern since the last poll cycle // this can mean either files which were removed, or rotated into a name not matching the pattern // we do this before reading existing files to ensure we emit older log lines before newer ones -func (m *Manager) preConsume(ctx context.Context) { +func (m *Manager) readLostFiles(ctx context.Context) { + if m.readerFactory.DeleteAtEOF { + // Lost files are not expected when delete_at_eof is enabled + // since we are deleting the files before they can become lost. + return + } lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len()) OUTER: for _, oldReader := range m.previousPollFiles.Get() { diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 74ccb5839e80..52418ae7ce75 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -985,16 +985,14 @@ func TestDeleteAfterRead(t *testing.T) { linesPerFile := 10 totalLines := files * linesPerFile - expectedTokens := make([][]byte, 0, totalLines) - actualTokens := make([][]byte, 0, totalLines) - tempDir := t.TempDir() temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { temps = append(temps, filetest.OpenTemp(t, tempDir)) } - // Write logs to each file + expectedTokens := make([][]byte, 0, totalLines) + actualTokens := make([][]byte, 0, totalLines) for i, temp := range temps { for j := 0; j < linesPerFile; j++ { line := filetest.TokenWithLength(100) @@ -1023,6 +1021,35 @@ func TestDeleteAfterRead(t *testing.T) { _, err := os.Stat(temp.Name()) require.True(t, os.IsNotExist(err)) } + + // Make more files to ensure deleted files do not cause problems on next poll + temps = make([]*os.File, 0, files) + for i := 0; i < files; i++ { + temps = append(temps, filetest.OpenTemp(t, tempDir)) + } + + expectedTokens = make([][]byte, 0, totalLines) + actualTokens = make([][]byte, 0, totalLines) + for i, temp := range temps { + for j := 0; j < linesPerFile; j++ { + line := filetest.TokenWithLength(200) + message := fmt.Sprintf("%s %d %d", line, i, j) + _, err := temp.WriteString(message + "\n") + require.NoError(t, err) + expectedTokens = append(expectedTokens, []byte(message)) + } + require.NoError(t, temp.Close()) + } + + operator.poll(context.Background()) + actualTokens = append(actualTokens, sink.NextTokens(t, totalLines)...) + + require.ElementsMatch(t, expectedTokens, actualTokens) + + for _, temp := range temps { + _, err := os.Stat(temp.Name()) + require.True(t, os.IsNotExist(err)) + } } func TestMaxBatching(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/file_windows.go b/pkg/stanza/fileconsumer/file_windows.go index 5481278301dd..8eed8a967bd5 100644 --- a/pkg/stanza/fileconsumer/file_windows.go +++ b/pkg/stanza/fileconsumer/file_windows.go @@ -9,7 +9,8 @@ import ( "context" ) -func (m *Manager) preConsume(ctx context.Context) { +// Noop on windows because we close files immediately after reading. +func (m *Manager) readLostFiles(ctx context.Context) { } // On windows, we close files immediately after reading because they cannot be moved while open. diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 529061723e5b..3c1111a43e2b 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -114,7 +114,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { // Delete will close and delete the file func (r *Reader) delete() { - r.Close() + r.close() if err := os.Remove(r.fileName); err != nil { r.logger.Errorf("could not delete %s", r.fileName) } @@ -122,6 +122,13 @@ func (r *Reader) delete() { // Close will close the file and return the metadata func (r *Reader) Close() *Metadata { + r.close() + m := r.Metadata + r.Metadata = nil + return m +} + +func (r *Reader) close() { if r.file != nil { if err := r.file.Close(); err != nil { r.logger.Debugw("Problem closing reader", zap.Error(err)) @@ -134,9 +141,6 @@ func (r *Reader) Close() *Metadata { r.logger.Errorw("Failed to stop header pipeline", zap.Error(err)) } } - m := r.Metadata - r.Metadata = nil - return m } // Read from the file and update the fingerprint if necessary