diff --git a/CHANGELOG.md b/CHANGELOG.md index 84467d7c45..8f084da5f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master / unreleased +* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled. + * [CHANGE] Flags changed with transition to upstream Prometheus rules manager: * `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url` * `ruler.group-timeout`has been removed diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index d97dc0eb7b..49b229229e 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -137,7 +137,7 @@ func (i *Ingester) sweepUsers(immediate bool) { oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix())) } -type flushReason int +type flushReason int8 const ( noFlush = iota @@ -146,6 +146,7 @@ const ( reasonAged reasonIdle reasonStale + reasonSpreadFlush ) func (f flushReason) String() string { @@ -162,6 +163,8 @@ func (f flushReason) String() string { return "Idle" case reasonStale: return "Stale" + case reasonSpreadFlush: + return "Spread" default: panic("unrecognised flushReason") } @@ -196,6 +199,9 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, // Flush if we have more than one chunk, and haven't already flushed the first chunk if len(series.chunkDescs) > 1 && !series.chunkDescs[0].flushed { + if series.chunkDescs[0].flushReason != noFlush { + return series.chunkDescs[0].flushReason + } return reasonMultipleChunksInSeries } else if len(series.chunkDescs) > 0 { // Otherwise look in more detail at the first chunk @@ -287,7 +293,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. chunks := series.chunkDescs if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) { - series.closeHead() + series.closeHead(reasonImmediate) } else { chunks = chunks[:len(chunks)-1] } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ffa80ced26..1ada90d16b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -355,7 +355,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, slot := startOfCycle.Add(time.Duration(uint64(fp) % uint64(i.cfg.MaxChunkAge))) // If adding this sample means the head chunk will span that point in time, close so it will get flushed if series.head().FirstTime < slot && timestamp >= slot { - series.closeHead() + series.closeHead(reasonSpreadFlush) } } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 7ce8f42f8b..5bf2f57f0e 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -123,7 +123,11 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) { return first, last, iter.Err() } -func (s *memorySeries) closeHead() { +// closeHead marks the head chunk closed. The caller must have locked +// the fingerprint of the memorySeries. This method will panic if this +// series has no chunk descriptors. +func (s *memorySeries) closeHead(reason flushReason) { + s.chunkDescs[0].flushReason = reason s.headChunkClosed = true } @@ -212,11 +216,12 @@ func (s *memorySeries) isStale() bool { } type desc struct { - C encoding.Chunk // nil if chunk is evicted. - FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. - LastTime model.Time // Timestamp of last sample. Populated at creation & on append. - LastUpdate model.Time // This server's local time on last change - flushed bool // set to true when flush succeeds + C encoding.Chunk // nil if chunk is evicted. + FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. + LastTime model.Time // Timestamp of last sample. Populated at creation & on append. + LastUpdate model.Time // This server's local time on last change + flushReason flushReason // If chunk is closed, holds the reason why. + flushed bool // set to true when flush succeeds } func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc {