Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report chunks flushed by spread-flushes option under separate label #1978

Merged
merged 2 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master / unreleased

* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled.

* [CHANGE] Flags changed with transition to upstream Prometheus rules manager:
* `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`
* `ruler.group-timeout`has been removed
Expand Down
10 changes: 8 additions & 2 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (i *Ingester) sweepUsers(immediate bool) {
oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
}

type flushReason int
type flushReason int8

const (
noFlush = iota
Expand All @@ -146,6 +146,7 @@ const (
reasonAged
reasonIdle
reasonStale
reasonSpreadFlush
)

func (f flushReason) String() string {
Expand All @@ -162,6 +163,8 @@ func (f flushReason) String() string {
return "Idle"
case reasonStale:
return "Stale"
case reasonSpreadFlush:
return "Spread"
default:
panic("unrecognised flushReason")
}
Expand Down Expand Up @@ -196,6 +199,9 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint,

// Flush if we have more than one chunk, and haven't already flushed the first chunk
if len(series.chunkDescs) > 1 && !series.chunkDescs[0].flushed {
if series.chunkDescs[0].flushReason != noFlush {
return series.chunkDescs[0].flushReason
}
return reasonMultipleChunksInSeries
} else if len(series.chunkDescs) > 0 {
// Otherwise look in more detail at the first chunk
Expand Down Expand Up @@ -287,7 +293,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
chunks := series.chunkDescs
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) {
series.closeHead()
series.closeHead(reasonImmediate)
} else {
chunks = chunks[:len(chunks)-1]
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
slot := startOfCycle.Add(time.Duration(uint64(fp) % uint64(i.cfg.MaxChunkAge)))
// If adding this sample means the head chunk will span that point in time, close so it will get flushed
if series.head().FirstTime < slot && timestamp >= slot {
series.closeHead()
series.closeHead(reasonSpreadFlush)
}
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) {
return first, last, iter.Err()
}

func (s *memorySeries) closeHead() {
// closeHead marks the head chunk closed. The caller must have locked
// the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors.
func (s *memorySeries) closeHead(reason flushReason) {
s.chunkDescs[0].flushReason = reason
s.headChunkClosed = true
}

Expand Down Expand Up @@ -212,11 +216,12 @@ func (s *memorySeries) isStale() bool {
}

type desc struct {
C encoding.Chunk // nil if chunk is evicted.
FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.
LastTime model.Time // Timestamp of last sample. Populated at creation & on append.
LastUpdate model.Time // This server's local time on last change
flushed bool // set to true when flush succeeds
C encoding.Chunk // nil if chunk is evicted.
FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.
LastTime model.Time // Timestamp of last sample. Populated at creation & on append.
LastUpdate model.Time // This server's local time on last change
flushReason flushReason // If chunk is closed, holds the reason why.
flushed bool // set to true when flush succeeds
}

func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc {
Expand Down