diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index c768abfa32b..ab9ed187bd7 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -24,6 +24,7 @@ import ( "context" "os" "runtime" + "strconv" "testing" "time" @@ -736,3 +737,169 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { cancelInput() env.waitUntilInputStops() } + +// test_symlinks_enabled from test_harvester.py +func TestFilestreamSymlinksEnabled(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(symlinkName), + }, + "prospector.scanner.symlinks": "true", + }) + + testlines := []byte("first line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + + cancelInput() + env.waitUntilInputStops() + + env.requireOffsetInRegistry(testlogName, len(testlines)) +} + +// test_symlink_rotated from test_harvester.py +func TestFilestreamSymlinkRotated(t *testing.T) { + env := newInputTestingEnvironment(t) + + firstTestlogName := "test1.log" + secondTestlogName := "test2.log" + symlinkName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(symlinkName), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + }) + + commonLine := "first line in file " + for i, path := range []string{firstTestlogName, secondTestlogName} { + env.mustWriteLinesToFile(path, []byte(commonLine+strconv.Itoa(i)+"\n")) + } + + env.mustSymlink(firstTestlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + + expectedOffset := len(commonLine) + 2 + env.requireOffsetInRegistry(firstTestlogName, expectedOffset) + + // rotate symlink + env.mustRemoveFile(symlinkName) + env.mustSymlink(secondTestlogName, symlinkName) + + moreLines := "second line in file 2\nthird line in file 2\n" + env.mustAppendLinesToFile(secondTestlogName, []byte(moreLines)) + + env.waitUntilEventCount(4) + env.requireOffsetInRegistry(firstTestlogName, expectedOffset) + env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(2) +} + +// test_symlink_removed from test_harvester.py +func TestFilestreamSymlinkRemoved(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(symlinkName), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + }) + + line := []byte("first line\n") + env.mustWriteLinesToFile(testlogName, line) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + + env.requireOffsetInRegistry(testlogName, len(line)) + + // remove symlink + env.mustRemoveFile(symlinkName) + + env.mustAppendLinesToFile(testlogName, line) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, 2*len(line)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(1) +} + +// test_truncate from test_harvester.py +func TestFilestreamTruncate(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath("*"), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + }) + + lines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, lines) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + + env.requireOffsetInRegistry(testlogName, len(lines)) + + // remove symlink + env.mustRemoveFile(symlinkName) + env.mustTruncateFile(testlogName, 0) + env.waitUntilOffsetInRegistry(testlogName, 0) + + // recreate symlink + env.mustSymlink(testlogName, symlinkName) + + moreLines := []byte("forth line\nfifth line\n") + env.mustWriteLinesToFile(testlogName, moreLines) + + env.waitUntilEventCount(5) + env.requireOffsetInRegistry(testlogName, len(moreLines)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(1) +}