diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 01207d63032bf..d136e08210466 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -167,7 +167,8 @@ func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr { } // TODO(owen-d): use a pool - chunks, err := toWireChunks(stream.chunks, nil) + // Only send chunks for checkpointing that have yet to be flushed. + chunks, err := toWireChunks(unflushedChunks(stream.chunks), nil) stream.chunkMtx.RUnlock() var s *Series @@ -506,3 +507,15 @@ func (c *Checkpointer) Run() { } } } + +func unflushedChunks(descs []chunkDesc) []chunkDesc { + filtered := make([]chunkDesc, 0, len(descs)) + + for _, d := range descs { + if d.flushed.IsZero() { + filtered = append(filtered, d) + } + } + + return filtered +} diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 2d2648682791e..30ef04be17731 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -227,6 +227,20 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } +func TestUnflushedChunks(t *testing.T) { + chks := []chunkDesc{ + { + flushed: time.Now(), + }, + {}, + { + flushed: time.Now(), + }, + } + + require.Equal(t, 1, len(unflushedChunks(chks))) +} + func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { fs, err := ioutil.ReadDir(walDir) require.Nil(t, err)