Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL backpressure #3218

Merged
merged 15 commits into from
Jan 27, 2021
Merged
Prev Previous commit
Next Next commit
more lenient expectCheckpoint function
  • Loading branch information
owen-d committed Jan 22, 2021
commit 1b981ffad68072e3e553e8e8d5a8e22978fe8ee1
38 changes: 24 additions & 14 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// ensure we haven't checkpointed yet
expectCheckpoint(t, walDir, false)
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
Expand All @@ -122,9 +122,8 @@ func TestIngesterWAL(t *testing.T) {
// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)

time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer
// ensure we have checkpointed now
expectCheckpoint(t, walDir, true)
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

Expand Down Expand Up @@ -278,7 +277,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// ensure we haven't checkpointed yet
expectCheckpoint(t, walDir, false)
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
Expand Down Expand Up @@ -320,9 +319,8 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
_, err = i.Push(ctx, req)
require.NoError(t, err)

time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer
// ensure we have checkpointed now
expectCheckpoint(t, walDir, true)
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

Expand All @@ -333,17 +331,29 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
}

func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) {
fs, err := ioutil.ReadDir(walDir)
require.Nil(t, err)
var found bool
for _, f := range fs {
if _, err := checkpointIndex(f.Name(), false); err == nil {
found = true
func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Duration) {
deadline := time.After(max)
for {
select {
case <-deadline:
require.Fail(t, "timeout while waiting for checkpoint existence:", shouldExist)
default:
<-time.After(max / 10) // check 10x over the duration
}

fs, err := ioutil.ReadDir(walDir)
require.Nil(t, err)
var found bool
for _, f := range fs {
if _, err := checkpointIndex(f.Name(), false); err == nil {
found = true
}
}
if found == shouldExist {
return
}
}

require.True(t, found == shouldExist)
}

// mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams
Expand Down