Skip to content

Commit

Permalink
[pkg/stanza] log when files are rotated/moved/truncated (#33237)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR adds the logging part from
#31256.
With this addition every time that is identified that file is rotated
either by move/create or copy/truncate, proper logging takes place. 

**Link to tracking Issue:** <Issue number if applicable>
#31256

**Testing:** <Describe what testing was performed and which tests were
added.>
Updated existing unit tests


### How to test this manually

Using the following config:

```yaml
receivers:
  filelog:
    start_at: beginning
    poll_interval: 5s
    include:
    - /var/log/busybox/monitoring/stable*.log

exporters:
  debug:
    verbosity: detailed

service:
  telemetry:
    logs:
      level: info
  pipelines:
    logs:
      receivers: [filelog]
      exporters: [debug]
      processors: []
```

#### Testing truncate (out of pattern)

```console
echo "$(date '+%FT%H:%M:%S.%NZ') some line1" >> /var/log/busybox/monitoring/stable_trunc.log && 
echo "$(date '+%FT%H:%M:%S.%NZ') some line2" >> /var/log/busybox/monitoring/stable_trunc.log &&  
echo "$(date '+%FT%H:%M:%S.%NZ') some line3" >> /var/log/busybox/monitoring/stable_trunc.log && 
sleep 6 && 
cp /var/log/busybox/monitoring/stable_trunc.log /var/log/busybox/monitoring/stable_trunc.log.1 && 
: > /var/log/busybox/monitoring/stable_trunc.log && 
echo "$(date '+%FT%H:%M:%S.%NZ') some line new0" >> /var/log/busybox/monitoring/stable_trunc.log
```

#### Testing truncate (in pattern)

```console
echo "$(date '+%FT%H:%M:%S.%NZ') some line1" >> /var/log/busybox/monitoring/stable_trunc.log &&
echo "$(date '+%FT%H:%M:%S.%NZ') some line2" >> /var/log/busybox/monitoring/stable_trunc.log && 
echo "$(date '+%FT%H:%M:%S.%NZ') some line3" >> /var/log/busybox/monitoring/stable_trunc.log &&
sleep 6 &&
cp /var/log/busybox/monitoring/stable_trunc.log /var/log/busybox/monitoring/stable_trunc_1.log &&
: > /var/log/busybox/monitoring/stable_trunc.log &&
echo "$(date '+%FT%H:%M:%S.%NZ') some line new1" >> /var/log/busybox/monitoring/stable_trunc.log
```

#### Testing move/create  (out of pattern)

```console
echo "$(date '+%FT%H:%M:%S.%NZ') some line1" >> /var/log/busybox/monitoring/stable_trunc.log &&
echo "$(date '+%FT%H:%M:%S.%NZ') some line2" >> /var/log/busybox/monitoring/stable_trunc.log && 
echo "$(date '+%FT%H:%M:%S.%NZ') some line3" >> /var/log/busybox/monitoring/stable_trunc.log &&
sleep 6 &&
mv /var/log/busybox/monitoring/stable_trunc.log /var/log/busybox/monitoring/stable_trunc.log.1 &&
echo "$(date '+%FT%H:%M:%S.%NZ') some line new0" >> /var/log/busybox/monitoring/stable_trunc.log
```

#### Testing move/create (in pattern)

```console
echo "$(date '+%FT%H:%M:%S.%NZ') some line1" >> /var/log/busybox/monitoring/stable_trunc.log &&
echo "$(date '+%FT%H:%M:%S.%NZ') some line2" >> /var/log/busybox/monitoring/stable_trunc.log && 
echo "$(date '+%FT%H:%M:%S.%NZ') some line3" >> /var/log/busybox/monitoring/stable_trunc.log &&
sleep 6 &&
mv /var/log/busybox/monitoring/stable_trunc.log /var/log/busybox/monitoring/stable_trunc_1.log &&
echo "$(date '+%FT%H:%M:%S.%NZ') some line new0" >> /var/log/busybox/monitoring/stable_trunc.log
```

**Documentation:** <Describe the documentation added.>
Add some extra notes in the `design.md`

---------

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark authored May 31, 2024
1 parent 8a91da7 commit 0978757
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_filelog_logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Log when files are rotated/moved/truncated

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33237]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
34 changes: 34 additions & 0 deletions pkg/stanza/fileconsumer/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,40 @@ When the operator shuts down, the following occurs:
The net effect of the shut down routine is that all files are checkpointed in a normal manner
(i.e. not in the middle of a log entry), and all checkpoints are persisted.

### Log rotation

#### Supported cases

A) When a file is moved within the pattern with unread logs on the end, then the original is created again,
we get the unread logs on the moved as well as any new logs written to the newly created file.

B) When a file is copied within the pattern with unread logs on the end, then the original is truncated,
we get the unread logs on the copy as well as any new logs written to the truncated file.

C) When a file it rotated out of pattern via move/create, we detect that
our old handle is still valid and we attempt to read from it.

D) When a file it rotated out of pattern via copy/truncate, we detect that
our old handle is invalid and we do not attempt to read from it.


#### Rotated files that end up within the matching pattern

In both cases of copy/truncate and move/create, if the rotated files match the pattern
then the old readers that point to the original path will be closed and we will create new
ones which will be pointing to the rotated file but using the existing metadata's offset.
The receiver will continue consuming the rotated paths in any case so there will be
no data loss during the transition.
The original files will have a fresh fingerprint so they will be consumed by a completely
new reader.

#### Rotated files that end up out of the matching pattern

In case of a file has been rotated with move/create, the old handle will be pointing
to the moved file so we can still consume from it even if it's out of the pattern.
In case of the file has been rotated with copy/truncate, the old handle will be pointing
to the original file which is truncated. So we don't have a handle in order to consume any remaining
logs from the moved file. This can cause data loss.

# Known Limitations

Expand Down
14 changes: 14 additions & 0 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) {
// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(fp); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
// re-add the reader as Match() removes duplicates
m.tracker.Add(r)
if err := file.Close(); err != nil {
Expand All @@ -229,6 +230,19 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) {
func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
if oldReader.GetFileName() != file.Name() {
if !oldReader.Validate() {
m.set.Logger.Debug(
"File has been rotated(truncated)",
zap.String("original_path", oldReader.GetFileName()),
zap.String("rotated_path", file.Name()))
} else {
m.set.Logger.Debug(
"File has been rotated(moved)",
zap.String("original_path", oldReader.GetFileName()),
zap.String("rotated_path", file.Name()))
}
}
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"sync"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

Expand All @@ -34,20 +36,25 @@ OUTER:
continue
}

// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader. We can use
// At this point, we know that the file has been rotated out of the matching pattern.
// However, we do not know if it was moved or truncated.
// If truncated, then both handles point to the same file, in which case
// we should only read from it using the new reader. We can use
// the Validate method to ensure that the file has not been truncated.
if !oldReader.Validate() {
m.set.Logger.Debug("File has been rotated(truncated)", zap.String("path", oldReader.GetFileName()))
continue OUTER
}
// oldreader points to the rotated file after the move/rename. We can still read from it.
m.set.Logger.Debug("File has been rotated(moved)", zap.String("path", oldReader.GetFileName()))
}
lostReaders = append(lostReaders, oldReader)
}

var lostWG sync.WaitGroup
for _, lostReader := range lostReaders {
lostWG.Add(1)
m.set.Logger.Debug("Reading lost file", zap.String("path", lostReader.GetFileName()))
go func(r *reader.Reader) {
defer lostWG.Done()
m.readingFiles.Add(ctx, 1)
Expand Down
4 changes: 4 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (r *Reader) Validate() bool {
return false
}

func (r *Reader) GetFileName() string {
return r.fileName
}

func (m Metadata) GetFingerprint() *fingerprint.Fingerprint {
return m.Fingerprint
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/stanza/fileconsumer/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
Expand Down Expand Up @@ -215,6 +218,9 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
core, observedLogs := observer.New(zap.DebugLevel)
logger := zap.New(core)
operator.set.Logger = logger

originalFile := filetest.OpenTemp(t, tempDir)
orginalName := originalFile.Name()
Expand All @@ -240,6 +246,16 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) {
filetest.WriteString(t, newFile, "testlog3\n")

sink.ExpectTokens(t, []byte("testlog2"), []byte("testlog3"))

// verify that proper logging has taken place
allLogs := observedLogs.All()
foundLog := false
for _, actualLog := range allLogs {
if actualLog.Message == "File has been rotated(moved)" {
foundLog = true
}
}
assert.True(t, foundLog)
}

// When a file it rotated out of pattern via move/create, we should
Expand All @@ -256,6 +272,9 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) {
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewUnscopedMockPersister()
core, observedLogs := observer.New(zap.DebugLevel)
logger := zap.New(core)
operator.set.Logger = logger

originalFile := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
originalFileName := originalFile.Name()
Expand All @@ -280,6 +299,20 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) {

// expect remaining log from old file as well as all from new file
sink.ExpectTokens(t, []byte("testlog2"), []byte("testlog4"), []byte("testlog5"))

// verify that proper logging has taken place
allLogs := observedLogs.All()
expectedLogs := map[string]string{
"File has been rotated(moved)": "",
"Reading lost file": "",
}
foundLogs := 0
for _, actualLog := range allLogs {
if _, ok := expectedLogs[actualLog.Message]; ok {
foundLogs++
}
}
assert.Equal(t, 2, foundLogs)
}

// When a file it rotated out of pattern via copy/truncate, we should
Expand All @@ -293,6 +326,9 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) {
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewUnscopedMockPersister()
core, observedLogs := observer.New(zap.DebugLevel)
logger := zap.New(core)
operator.set.Logger = logger

originalFile := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
filetest.WriteString(t, originalFile, "testlog1\n")
Expand All @@ -318,6 +354,16 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) {
operator.poll(context.Background())

sink.ExpectTokens(t, []byte("testlog4"), []byte("testlog5"))

// verify that proper logging has taken place
allLogs := observedLogs.All()
foundLog := false
for _, actualLog := range allLogs {
if actualLog.Message == "File has been rotated(truncated)" {
foundLog = true
}
}
assert.True(t, foundLog)
}

// TruncateThenWrite tests that, after a file has been truncated,
Expand All @@ -333,6 +379,9 @@ func TestTruncateThenWrite(t *testing.T) {
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewUnscopedMockPersister()
core, observedLogs := observer.New(zap.DebugLevel)
logger := zap.New(core)
operator.set.Logger = logger

temp1 := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp1, "testlog1\ntestlog2\n")
Expand All @@ -348,6 +397,16 @@ func TestTruncateThenWrite(t *testing.T) {
operator.poll(context.Background())
sink.ExpectToken(t, []byte("testlog3"))
sink.ExpectNoCalls(t)

// verify that proper logging has taken place
allLogs := observedLogs.All()
foundLog := false
for _, actualLog := range allLogs {
if actualLog.Message == "File has been rotated(truncated)" {
foundLog = true
}
}
assert.True(t, foundLog)
}

// CopyTruncateWriteBoth tests that when a file is copied
Expand Down

0 comments on commit 0978757

Please sign in to comment.