Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ingester.spread-flushes from flush time to sample add time #1578

Merged
merged 3 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,6 @@ func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason {
return noFlush
}

if i.cfg.SpreadFlushes {
now := model.Now()
// Map from the fingerprint hash to a fixed point in the cycle of period MaxChunkAge
startOfCycle := now.Add(-(now.Sub(model.Time(0)) % i.cfg.MaxChunkAge))
slot := startOfCycle.Add(time.Duration(fp) % i.cfg.MaxChunkAge)
// If that point is now, to the resolution of FlushCheckPeriod, flush the chunk.
if slot >= now && slot < now.Add(i.cfg.FlushCheckPeriod) {
return reasonAged
}
// If we missed the slot due to timing it will get caught by the MaxChunkAge check below, some time later.
}
// Adjust max age slightly to spread flushes out over time
var jitter time.Duration
if i.cfg.ChunkAgeJitter != 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp mode
}()

prevNumChunks := len(series.chunkDescs)
if i.cfg.SpreadFlushes && prevNumChunks > 0 {
// Map from the fingerprint hash to a point in the cycle of period MaxChunkAge
startOfCycle := timestamp.Add(-(timestamp.Sub(model.Time(0)) % i.cfg.MaxChunkAge))
slot := startOfCycle.Add(time.Duration(uint64(fp) % uint64(i.cfg.MaxChunkAge)))
// If adding this sample means the head chunk will span that point in time, close so it will get flushed
if series.head().FirstTime < slot && timestamp >= slot {
series.closeHead()
}
}

if err := series.add(model.SamplePair{
Value: value,
Timestamp: timestamp,
Expand Down
30 changes: 25 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ func runTestQueryTimes(ctx context.Context, t *testing.T, ing *Ingester, ty labe
return res, req, nil
}

func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries int) ([]string, map[string]model.Matrix) {
func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries, offset int) ([]string, map[string]model.Matrix) {
userIDs := []string{"1", "2", "3"}

// Create test samples.
testData := map[string]model.Matrix{}
for i, userID := range userIDs {
testData[userID] = buildTestMatrix(numSeries, samplesPerSeries, i)
testData[userID] = buildTestMatrix(numSeries, samplesPerSeries, i+offset)
}

// Append samples.
Expand All @@ -166,7 +166,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries in

func TestIngesterAppend(t *testing.T) {
store, ing := newDefaultTestStore(t)
userIDs, testData := pushTestSamples(t, ing, 10, 1000)
userIDs, testData := pushTestSamples(t, ing, 10, 1000, 0)

// Read samples back via ingester queries.
for _, userID := range userIDs {
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestIngesterAppend(t *testing.T) {
func TestIngesterSendsOnlySeriesWithData(t *testing.T) {
_, ing := newDefaultTestStore(t)

userIDs, _ := pushTestSamples(t, ing, 10, 1000)
userIDs, _ := pushTestSamples(t, ing, 10, 1000, 0)

// Read samples back via ingester queries.
for _, userID := range userIDs {
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestIngesterIdleFlush(t *testing.T) {
cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())

userIDs, testData := pushTestSamples(t, ing, 4, 100)
userIDs, testData := pushTestSamples(t, ing, 4, 100, 0)

// wait beyond idle time so samples flush
time.Sleep(cfg.MaxChunkIdle * 2)
Expand All @@ -251,6 +251,26 @@ func TestIngesterIdleFlush(t *testing.T) {
}
}

func TestIngesterSpreadFlush(t *testing.T) {
// Create test ingester with short flush cycle
cfg := defaultIngesterTestConfig()
cfg.SpreadFlushes = true
cfg.FlushCheckPeriod = 20 * time.Millisecond
store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())

userIDs, testData := pushTestSamples(t, ing, 4, 100, 0)

// add another sample with timestamp at the end of the cycle to trigger
// head closes and get an extra chunk so we will flush the first one
_, _ = pushTestSamples(t, ing, 4, 1, int(cfg.MaxChunkAge.Seconds()-1)*1000)

// wait beyond flush time so first set of samples should be sent to store
time.Sleep(cfg.FlushCheckPeriod * 2)

// check the first set of samples has been sent to the store
store.checkData(t, userIDs, testData)
}

type stream struct {
grpc.ServerStream
ctx context.Context
Expand Down
4 changes: 1 addition & 3 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ func newMemorySeries(m labels.Labels) *memorySeries {
}
}

// add adds a sample pair to the series. It returns the number of newly
// completed chunks (which are now eligible for persistence).
//
// add adds a sample pair to the series, possibly creating a new chunk.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(v model.SamplePair) error {
// If sender has repeated the same timestamp, check more closely and perhaps return error.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/user_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestForSeriesMatchingBatching(t *testing.T) {
} {
t.Run(fmt.Sprintf("numSeries=%d,batchSize=%d,matchers=%s", tc.numSeries, tc.batchSize, tc.matchers), func(t *testing.T) {
_, ing := newDefaultTestStore(t)
userIDs, _ := pushTestSamples(t, ing, tc.numSeries, 100)
userIDs, _ := pushTestSamples(t, ing, tc.numSeries, 100, 0)

for _, userID := range userIDs {
ctx := user.InjectOrgID(context.Background(), userID)
Expand Down