diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e1b0e7b5d8..ed1c6cde65 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -191,17 +191,6 @@ func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason { return noFlush } - if i.cfg.SpreadFlushes { - now := model.Now() - // Map from the fingerprint hash to a fixed point in the cycle of period MaxChunkAge - startOfCycle := now.Add(-(now.Sub(model.Time(0)) % i.cfg.MaxChunkAge)) - slot := startOfCycle.Add(time.Duration(fp) % i.cfg.MaxChunkAge) - // If that point is now, to the resolution of FlushCheckPeriod, flush the chunk. - if slot >= now && slot < now.Add(i.cfg.FlushCheckPeriod) { - return reasonAged - } - // If we missed the slot due to timing it will get caught by the MaxChunkAge check below, some time later. - } // Adjust max age slightly to spread flushes out over time var jitter time.Duration if i.cfg.ChunkAgeJitter != 0 { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5eacca7a7c..ee682ca7da 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -301,6 +301,16 @@ func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp mode }() prevNumChunks := len(series.chunkDescs) + if i.cfg.SpreadFlushes && prevNumChunks > 0 { + // Map from the fingerprint hash to a point in the cycle of period MaxChunkAge + startOfCycle := timestamp.Add(-(timestamp.Sub(model.Time(0)) % i.cfg.MaxChunkAge)) + slot := startOfCycle.Add(time.Duration(uint64(fp) % uint64(i.cfg.MaxChunkAge))) + // If adding this sample means the head chunk will span that point in time, close so it will get flushed + if series.head().FirstTime < slot && timestamp >= slot { + series.closeHead() + } + } + if err := series.add(model.SamplePair{ Value: value, Timestamp: timestamp, diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 3e3959a92d..5a7e80abda 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -145,13 +145,13 @@ func runTestQueryTimes(ctx context.Context, t *testing.T, ing *Ingester, ty labe return res, req, nil } -func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries int) ([]string, map[string]model.Matrix) { +func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries, offset int) ([]string, map[string]model.Matrix) { userIDs := []string{"1", "2", "3"} // Create test samples. testData := map[string]model.Matrix{} for i, userID := range userIDs { - testData[userID] = buildTestMatrix(numSeries, samplesPerSeries, i) + testData[userID] = buildTestMatrix(numSeries, samplesPerSeries, i+offset) } // Append samples. @@ -166,7 +166,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries in func TestIngesterAppend(t *testing.T) { store, ing := newDefaultTestStore(t) - userIDs, testData := pushTestSamples(t, ing, 10, 1000) + userIDs, testData := pushTestSamples(t, ing, 10, 1000, 0) // Read samples back via ingester queries. for _, userID := range userIDs { @@ -194,7 +194,7 @@ func TestIngesterAppend(t *testing.T) { func TestIngesterSendsOnlySeriesWithData(t *testing.T) { _, ing := newDefaultTestStore(t) - userIDs, _ := pushTestSamples(t, ing, 10, 1000) + userIDs, _ := pushTestSamples(t, ing, 10, 1000, 0) // Read samples back via ingester queries. for _, userID := range userIDs { @@ -224,7 +224,7 @@ func TestIngesterIdleFlush(t *testing.T) { cfg.RetainPeriod = 500 * time.Millisecond store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig()) - userIDs, testData := pushTestSamples(t, ing, 4, 100) + userIDs, testData := pushTestSamples(t, ing, 4, 100, 0) // wait beyond idle time so samples flush time.Sleep(cfg.MaxChunkIdle * 2) @@ -251,6 +251,26 @@ func TestIngesterIdleFlush(t *testing.T) { } } +func TestIngesterSpreadFlush(t *testing.T) { + // Create test ingester with short flush cycle + cfg := defaultIngesterTestConfig() + cfg.SpreadFlushes = true + cfg.FlushCheckPeriod = 20 * time.Millisecond + store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig()) + + userIDs, testData := pushTestSamples(t, ing, 4, 100, 0) + + // add another sample with timestamp at the end of the cycle to trigger + // head closes and get an extra chunk so we will flush the first one + _, _ = pushTestSamples(t, ing, 4, 1, int(cfg.MaxChunkAge.Seconds()-1)*1000) + + // wait beyond flush time so first set of samples should be sent to store + time.Sleep(cfg.FlushCheckPeriod * 2) + + // check the first set of samples has been sent to the store + store.checkData(t, userIDs, testData) +} + type stream struct { grpc.ServerStream ctx context.Context diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 023bef52d2..3c7bb72542 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -59,9 +59,7 @@ func newMemorySeries(m labels.Labels) *memorySeries { } } -// add adds a sample pair to the series. It returns the number of newly -// completed chunks (which are now eligible for persistence). -// +// add adds a sample pair to the series, possibly creating a new chunk. // The caller must have locked the fingerprint of the series. func (s *memorySeries) add(v model.SamplePair) error { // If sender has repeated the same timestamp, check more closely and perhaps return error. diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index a58a0eb161..5df173dcf1 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -43,7 +43,7 @@ func TestForSeriesMatchingBatching(t *testing.T) { } { t.Run(fmt.Sprintf("numSeries=%d,batchSize=%d,matchers=%s", tc.numSeries, tc.batchSize, tc.matchers), func(t *testing.T) { _, ing := newDefaultTestStore(t) - userIDs, _ := pushTestSamples(t, ing, tc.numSeries, 100) + userIDs, _ := pushTestSamples(t, ing, tc.numSeries, 100, 0) for _, userID := range userIDs { ctx := user.InjectOrgID(context.Background(), userID)