Skip to content
This repository has been archived by the owner on Jan 12, 2023. It is now read-only.

Commit

Permalink
Squashed commit of the PR cortexproject#2022:
Browse files Browse the repository at this point in the history
commit df8b95f
Author: Bryan Boreham <bryan@weave.works>
Date:   Wed Jan 22 13:52:44 2020 +0000

    Pass the correct flush reason to closeHead()

    Pass the reason from `shouldFlushChunk()` to `closeHead()`;
    sending `reasonImmediate` was a bug.

    We don't need to check `len(chunks)` is non-zero, since that would
    return `noFlush` from `shouldFlushSeries()`.

    Signed-off-by: Bryan Boreham <bryan@weave.works>

commit 17c0c21
Author: Bryan Boreham <bryan@weave.works>
Date:   Wed Jan 22 13:45:22 2020 +0000

    Make sure we return noFlush on an empty series

    Signed-off-by: Bryan Boreham <bryan@weave.works>

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Jan 23, 2020
1 parent 943f572 commit bdb0264
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo
}

func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, immediate bool) flushReason {
if len(series.chunkDescs) == 0 {
return noFlush
}
if immediate {
return reasonImmediate
}
Expand All @@ -203,12 +206,9 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint,
return series.chunkDescs[0].flushReason
}
return reasonMultipleChunksInSeries
} else if len(series.chunkDescs) > 0 {
// Otherwise look in more detail at the first chunk
return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale())
}

return noFlush
// Otherwise look in more detail at the first chunk
return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale())
}

func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason {
Expand Down Expand Up @@ -290,11 +290,14 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
return nil
}

// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
// shouldFlushSeries() has told us we have at least one chunk
chunks := series.chunkDescs
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) {
if immediate {
series.closeHead(reasonImmediate)
} else if chunkReason := i.shouldFlushChunk(series.head(), fp, series.isStale()); chunkReason != noFlush {
series.closeHead(chunkReason)
} else {
// The head chunk doesn't need flushing; step back by one.
chunks = chunks[:len(chunks)-1]
}

Expand Down

0 comments on commit bdb0264

Please sign in to comment.