Skip to content

Commit

Permalink
Report chunks flushed by spread-flushes option under separate label (#…
Browse files Browse the repository at this point in the history
…1978)

* Report chunks flushed by spread-flushes option under separate label

This improves observability of flushing, creating separate labels for
chunks that overflowed versus to series that reached their time under
spread-flushes behaviour.

The flushReason type shrinks to int8 to avoid bloating the chunk desc object.

Signed-off-by: Bryan Boreham <bryan@weave.works>

* Add changelog note about new flush_reasons label value

Signed-off-by: Bryan Boreham <bryan@weave.works>
  • Loading branch information
bboreham authored and gouthamve committed Jan 15, 2020
1 parent 209fe10 commit ec2d8c4
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master / unreleased

* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled.

* [CHANGE] Flags changed with transition to upstream Prometheus rules manager:
* `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`
* `ruler.group-timeout`has been removed
Expand Down
10 changes: 8 additions & 2 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (i *Ingester) sweepUsers(immediate bool) {
oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
}

type flushReason int
type flushReason int8

const (
noFlush = iota
Expand All @@ -146,6 +146,7 @@ const (
reasonAged
reasonIdle
reasonStale
reasonSpreadFlush
)

func (f flushReason) String() string {
Expand All @@ -162,6 +163,8 @@ func (f flushReason) String() string {
return "Idle"
case reasonStale:
return "Stale"
case reasonSpreadFlush:
return "Spread"
default:
panic("unrecognised flushReason")
}
Expand Down Expand Up @@ -196,6 +199,9 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint,

// Flush if we have more than one chunk, and haven't already flushed the first chunk
if len(series.chunkDescs) > 1 && !series.chunkDescs[0].flushed {
if series.chunkDescs[0].flushReason != noFlush {
return series.chunkDescs[0].flushReason
}
return reasonMultipleChunksInSeries
} else if len(series.chunkDescs) > 0 {
// Otherwise look in more detail at the first chunk
Expand Down Expand Up @@ -287,7 +293,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
chunks := series.chunkDescs
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) {
series.closeHead()
series.closeHead(reasonImmediate)

This comment has been minimized.

Copy link
@bboreham

bboreham Jan 22, 2020

Author Contributor

This reason is incorrect for the second half of the if condition, sorry.

} else {
chunks = chunks[:len(chunks)-1]
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
slot := startOfCycle.Add(time.Duration(uint64(fp) % uint64(i.cfg.MaxChunkAge)))
// If adding this sample means the head chunk will span that point in time, close so it will get flushed
if series.head().FirstTime < slot && timestamp >= slot {
series.closeHead()
series.closeHead(reasonSpreadFlush)
}
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) {
return first, last, iter.Err()
}

func (s *memorySeries) closeHead() {
// closeHead marks the head chunk closed. The caller must have locked
// the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors.
func (s *memorySeries) closeHead(reason flushReason) {
s.chunkDescs[0].flushReason = reason
s.headChunkClosed = true
}

Expand Down Expand Up @@ -212,11 +216,12 @@ func (s *memorySeries) isStale() bool {
}

type desc struct {
C encoding.Chunk // nil if chunk is evicted.
FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.
LastTime model.Time // Timestamp of last sample. Populated at creation & on append.
LastUpdate model.Time // This server's local time on last change
flushed bool // set to true when flush succeeds
C encoding.Chunk // nil if chunk is evicted.
FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.
LastTime model.Time // Timestamp of last sample. Populated at creation & on append.
LastUpdate model.Time // This server's local time on last change
flushReason flushReason // If chunk is closed, holds the reason why.
flushed bool // set to true when flush succeeds
}

func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc {
Expand Down

0 comments on commit ec2d8c4

Please sign in to comment.