Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for truncated and symlinked files #24425

Merged
merged 3 commits into from
Apr 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"os"
"runtime"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -736,3 +737,169 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
cancelInput()
env.waitUntilInputStops()
}

// test_symlinks_enabled from test_harvester.py
func TestFilestreamSymlinksEnabled(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
urso marked this conversation as resolved.
Show resolved Hide resolved
},
"prospector.scanner.symlinks": "true",
})

testlines := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines))
}

// test_symlink_rotated from test_harvester.py
func TestFilestreamSymlinkRotated(t *testing.T) {
env := newInputTestingEnvironment(t)

firstTestlogName := "test1.log"
secondTestlogName := "test2.log"
symlinkName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
})

commonLine := "first line in file "
for i, path := range []string{firstTestlogName, secondTestlogName} {
env.mustWriteLinesToFile(path, []byte(commonLine+strconv.Itoa(i)+"\n"))
}

env.mustSymlink(firstTestlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

expectedOffset := len(commonLine) + 2
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)

// rotate symlink
env.mustRemoveFile(symlinkName)
env.mustSymlink(secondTestlogName, symlinkName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I would not expect the symlink to be removed when rotating it. Just change the location it points to. What would happen in that case? What would happen if the new log file is larger then the old one when we update the link?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just ported blindly the test from the original Python one. There should not be any change in behaviour of Filebeat if the symlink is renamed.

If the log file is bigger than the previous, Filebeat cannot detect truncation regardless of what we do with the file symlink, rename, remove and create again. That is why we need the special copytruncate prospector.

Copy link

@urso urso Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So we do track the inode of the symlink file, and not the inode of the target file.

I have just ported blindly the test from the original Python one. There should not be any change in behaviour of Filebeat if the symlink is renamed.

I still would expect a slight difference. If we remove the symlink file before, then the symlink is a new file with a new inode. If we just overwrite the symlink with a new file the inode is not changed and filebeat would/should see it as a truncation event. Also the observable result (collect from offset 0) should be the same, the internal handling of these 2 cases differ, right? So it would be nice to have additonal tests for symlinks, but I would say these cases can be tested as unit tests in the prospector or file watcher.

I guess we need to rethink symlink support to some extent. Detecting changes on a symlink should not only depend on the file size. Have we also considered the case of a symlink becoming an actual file (or the other way around)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK if I add a test for the scenario you had described in a follow up PR? You are raising valid points. However, we should decide what we want to tackle in the default prospector and what we want to cover in the copytruncate prospector. I would rather give us a bit more time for that and address your concerns in a new PR. WDYT?


moreLines := "second line in file 2\nthird line in file 2\n"
env.mustAppendLinesToFile(secondTestlogName, []byte(moreLines))

env.waitUntilEventCount(4)
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)
env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(2)
}

// test_symlink_removed from test_harvester.py
func TestFilestreamSymlinkRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
})

line := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, line)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

env.requireOffsetInRegistry(testlogName, len(line))

// remove symlink
env.mustRemoveFile(symlinkName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if I don't remove/update the symlink, but the file the symlink points to? Do we even detect that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the reader opens the original file and tracks everything the same way as done in ordinary files.


env.mustAppendLinesToFile(testlogName, line)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, 2*len(line))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we have close...removed: true? Will remove act on the symlink state, or on the resolved file?

What happens on close...renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filebeat cares about the original file, not the symlink. The setting has no impact on the behaviour of the reader if the symlink is deleted. I am not sure why the original test contains it.


cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_truncate from test_harvester.py
func TestFilestreamTruncate(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath("*"),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
})

lines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, lines)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)

env.requireOffsetInRegistry(testlogName, len(lines))

// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, 0)
kvch marked this conversation as resolved.
Show resolved Hide resolved

// recreate symlink
env.mustSymlink(testlogName, symlinkName)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteLinesToFile(testlogName, moreLines)

env.waitUntilEventCount(5)
env.requireOffsetInRegistry(testlogName, len(moreLines))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}