Skip to content

Commit

Permalink
Add tests for truncated and symlinked files in filestream input (elas…
Browse files Browse the repository at this point in the history
…tic#24425)

(cherry picked from commit a9279cd)
  • Loading branch information
kvch committed Apr 15, 2021
1 parent 9b82b2a commit 92b81be
Showing 1 changed file with 167 additions and 0 deletions.
167 changes: 167 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"os"
"runtime"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -736,3 +737,169 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
cancelInput()
env.waitUntilInputStops()
}

// test_symlinks_enabled from test_harvester.py
func TestFilestreamSymlinksEnabled(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.symlinks": "true",
})

testlines := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines))
}

// test_symlink_rotated from test_harvester.py
func TestFilestreamSymlinkRotated(t *testing.T) {
env := newInputTestingEnvironment(t)

firstTestlogName := "test1.log"
secondTestlogName := "test2.log"
symlinkName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
})

commonLine := "first line in file "
for i, path := range []string{firstTestlogName, secondTestlogName} {
env.mustWriteLinesToFile(path, []byte(commonLine+strconv.Itoa(i)+"\n"))
}

env.mustSymlink(firstTestlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

expectedOffset := len(commonLine) + 2
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)

// rotate symlink
env.mustRemoveFile(symlinkName)
env.mustSymlink(secondTestlogName, symlinkName)

moreLines := "second line in file 2\nthird line in file 2\n"
env.mustAppendLinesToFile(secondTestlogName, []byte(moreLines))

env.waitUntilEventCount(4)
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)
env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(2)
}

// test_symlink_removed from test_harvester.py
func TestFilestreamSymlinkRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
})

line := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, line)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)

env.requireOffsetInRegistry(testlogName, len(line))

// remove symlink
env.mustRemoveFile(symlinkName)

env.mustAppendLinesToFile(testlogName, line)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, 2*len(line))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_truncate from test_harvester.py
func TestFilestreamTruncate(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{
env.abspath("*"),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
})

lines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, lines)

env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)

env.requireOffsetInRegistry(testlogName, len(lines))

// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, 0)

// recreate symlink
env.mustSymlink(testlogName, symlinkName)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteLinesToFile(testlogName, moreLines)

env.waitUntilEventCount(5)
env.requireOffsetInRegistry(testlogName, len(moreLines))

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

0 comments on commit 92b81be

Please sign in to comment.