Skip to content

Commit

Permalink
fix(promtail): remove flaky TestFileTarget_StopsTailersCleanly (#16473)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Feb 26, 2025
1 parent 5335a21 commit 2ab63d2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 115 deletions.
6 changes: 2 additions & 4 deletions clients/pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ type WatchConfig struct {
MaxPollFrequency time.Duration `mapstructure:"max_poll_frequency" yaml:"max_poll_frequency"`
}

var DefaultWatchConig = WatchConfig{
var DefaultWatchConfig = WatchConfig{
MinPollFrequency: 250 * time.Millisecond,
MaxPollFrequency: 250 * time.Millisecond,
}

// RegisterFlags with prefix registers flags where every name is prefixed by
// prefix. If prefix is a non-empty string, prefix should end with a period.
func (cfg *WatchConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
d := DefaultWatchConig
d := DefaultWatchConfig

f.DurationVar(&cfg.MinPollFrequency, prefix+"min_poll_frequency", d.MinPollFrequency, "Minimum period to poll for file changes")
f.DurationVar(&cfg.MaxPollFrequency, prefix+"max_poll_frequency", d.MaxPollFrequency, "Maximum period to poll for file changes")
Expand Down Expand Up @@ -247,7 +247,6 @@ func (t *FileTarget) sync() error {
} else {
// Gets current list of files to tail.
matches, err = doublestar.FilepathGlob(t.path)

if err != nil {
return errors.Wrap(err, "filetarget.sync.filepath.Glob")
}
Expand All @@ -257,7 +256,6 @@ func (t *FileTarget) sync() error {
matchesExcluded = []string{t.pathExclude}
} else {
matchesExcluded, err = doublestar.FilepathGlob(t.pathExclude)

if err != nil {
return errors.Wrap(err, "filetarget.sync.filepathexclude.Glob")
}
Expand Down
118 changes: 13 additions & 105 deletions clients/pkg/promtail/targets/file/filetarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -72,7 +71,7 @@ func TestFileTargetSync(t *testing.T) {
path := logDir1 + "/*.log"
target, err := NewFileTarget(metrics, logger, client, ps, path, "", nil, nil, &Config{
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Start with nothing watched.
Expand All @@ -84,7 +83,7 @@ func TestFileTargetSync(t *testing.T) {
}

// Create the base dir, still nothing watched.
err = os.MkdirAll(logDir1, 0750)
err = os.MkdirAll(logDir1, 0o750)
assert.NoError(t, err)

err = target.sync()
Expand Down Expand Up @@ -191,7 +190,7 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

_, err = os.Create(logFile)
Expand Down Expand Up @@ -247,95 +246,6 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
`), "promtail_files_active_total"))
}

func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

tempDir := t.TempDir()
positionsFileName := filepath.Join(tempDir, "positions.yml")

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Millisecond,
PositionsFile: positionsFileName,
})
require.NoError(t, err)

client := fake.New(func() {})
defer client.Stop()

pathToWatch := filepath.Join(tempDir, "*.log")
registry := prometheus.NewRegistry()
metrics := NewMetrics(registry)

// Increase this to several thousand to make the test more likely to fail when debugging a race condition
iterations := 500
fakeHandler := make(chan fileTargetEvent, 10*iterations)
for i := 0; i < iterations; i++ {
logFile := filepath.Join(tempDir, fmt.Sprintf("test_%d.log", i))

target, err := NewFileTarget(metrics, logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

file, err := os.Create(logFile)
assert.NoError(t, err)

// Write some data to the file
for j := 0; j < 5; j++ {
_, _ = file.WriteString(fmt.Sprintf("test %d\n", j))
}
require.NoError(t, file.Close())

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_lines_total") == 1
}, "expected 1 read_lines_total metric")

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 1
}, "expected 1 read_bytes_total metric")

requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.readLines) == 5
}, "expected 5 read_lines_total")

requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.totalBytes) == 35
}, "expected 35 total_bytes")

requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.readBytes) == 35
}, "expected 35 read_bytes")

// Concurrently stop the target and remove the file
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
sleepRandomDuration(time.Millisecond * 10)
target.Stop()
wg.Done()

}()
go func() {
sleepRandomDuration(time.Millisecond * 10)
_ = os.Remove(logFile)
wg.Done()
}()

wg.Wait()

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 0
}, "expected read_bytes_total metric to be cleaned up")

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_file_bytes_total") == 0
}, "expected file_bytes_total metric to be cleaned up")
}

ps.Stop()
}

// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send.
func TestFileTarget_StopAbruptly(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
Expand Down Expand Up @@ -367,11 +277,11 @@ func TestFileTarget_StopAbruptly(t *testing.T) {
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Create a directory, still nothing is watched.
err = os.MkdirAll(logDir1, 0750)
err = os.MkdirAll(logDir1, 0o750)
assert.NoError(t, err)
_, err = os.Create(logfile1)
assert.NoError(t, err)
Expand All @@ -392,12 +302,12 @@ func TestFileTarget_StopAbruptly(t *testing.T) {

// Create two directories - one more than the buffer of fakeHandler,
// so that the file target hands until we call Stop().
err = os.MkdirAll(logDir2, 0750)
err = os.MkdirAll(logDir2, 0o750)
assert.NoError(t, err)
_, err = os.Create(logfile2)
assert.NoError(t, err)

err = os.MkdirAll(logDir3, 0750)
err = os.MkdirAll(logDir3, 0o750)
assert.NoError(t, err)
_, err = os.Create(logfile3)
assert.NoError(t, err)
Expand Down Expand Up @@ -479,7 +389,7 @@ func TestFileTargetPathExclusion(t *testing.T) {
pathExclude := filepath.Join(dirName, "log3", "*.log")
target, err := NewFileTarget(metrics, logger, client, ps, path, pathExclude, nil, nil, &Config{
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Start with nothing watched.
Expand All @@ -491,11 +401,11 @@ func TestFileTargetPathExclusion(t *testing.T) {
}

// Create the base directories, still nothing watched.
err = os.MkdirAll(logDir1, 0750)
err = os.MkdirAll(logDir1, 0o750)
assert.NoError(t, err)
err = os.MkdirAll(logDir2, 0750)
err = os.MkdirAll(logDir2, 0o750)
assert.NoError(t, err)
err = os.MkdirAll(logDir3, 0750)
err = os.MkdirAll(logDir3, 0o750)
assert.NoError(t, err)

err = target.sync()
Expand Down Expand Up @@ -571,7 +481,7 @@ func TestHandleFileCreationEvent(t *testing.T) {
logFile := filepath.Join(logDir, "test1.log")
logFileIgnored := filepath.Join(logDir, "test.donot.log")

if err := os.MkdirAll(logDir, 0750); err != nil {
if err := os.MkdirAll(logDir, 0o750); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -610,7 +520,7 @@ func TestHandleFileCreationEvent(t *testing.T) {
target, err := NewFileTarget(metrics, logger, client, ps, path, pathExclude, nil, nil, &Config{
// To handle file creation event from channel, set enough long time as sync period
SyncPeriod: 10 * time.Minute,
}, DefaultWatchConig, fakeFileHandler, fakeTargetHandler, "", nil)
}, DefaultWatchConfig, fakeFileHandler, fakeTargetHandler, "", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -653,7 +563,6 @@ func TestToStopTailing(t *testing.T) {
t.Error("Results mismatch, expected", expected[i], "got", st[i])
}
}

}

func BenchmarkToStopTailing(b *testing.B) {
Expand Down Expand Up @@ -717,7 +626,6 @@ func TestMissing(t *testing.T) {
if _, ok := c["str3"]; !ok {
t.Error("Expected the set to contain str3 but it did not")
}

}

func requireEventually(t *testing.T, f func() bool, msg string) {
Expand Down
11 changes: 5 additions & 6 deletions clients/pkg/promtail/targets/file/filetargetmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func newTestLogDirectories(t *testing.T) string {
tmpDir := t.TempDir()
logFileDir := filepath.Join(tmpDir, "logs")
err := os.MkdirAll(logFileDir, 0750)
err := os.MkdirAll(logFileDir, 0o750)
assert.NoError(t, err)
return logFileDir
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi
}

metrics := NewMetrics(nil)
ftm, err := NewFileTargetManager(metrics, logger, positions, client, []scrapeconfig.Config{sc}, tc, DefaultWatchConig)
ftm, err := NewFileTargetManager(metrics, logger, positions, client, []scrapeconfig.Config{sc}, tc, DefaultWatchConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestDeadlockStartWatchingDuringSync(t *testing.T) {
go func() {
for i := 0; i < 10; i++ {
dir := filepath.Join(newLogDir, fmt.Sprintf("%d", i))
err := os.MkdirAll(dir, 0750)
err := os.MkdirAll(dir, 0o750)
assert.NoError(t, err)
time.Sleep(1 * time.Millisecond)
for j := 0; j < 100; j++ {
Expand Down Expand Up @@ -551,13 +551,13 @@ func TestLabelSetUpdate(t *testing.T) {
},
}

var target = model.LabelSet{
target := model.LabelSet{
hostLabel: "localhost",
pathLabel: "baz",
"job": "foo",
}

var target2 = model.LabelSet{
target2 := model.LabelSet{
hostLabel: "localhost",
pathLabel: "baz",
"job": "foo2",
Expand Down Expand Up @@ -593,7 +593,6 @@ func TestLabelSetUpdate(t *testing.T) {
}, targetEventHandler)
require.Equal(t, 0, len(syncer.targets))
require.Equal(t, 0, len(syncer.fileEventWatchers))

}

func TestFulfillKubePodSelector(t *testing.T) {
Expand Down

0 comments on commit 2ab63d2

Please sign in to comment.