diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index b3f207119f857..883690cbcca08 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -133,6 +133,7 @@ func (i *Ingester) flush(mayRemoveStreams bool) { } i.flushQueuesDone.Wait() + level.Debug(util_log.Logger).Log("msg", "flush queues have drained") } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 2942eed517f67..d685d02dacc9e 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -24,6 +24,7 @@ type ingesterMetrics struct { recoveredEntriesTotal prometheus.Counter recoveredBytesTotal prometheus.Counter recoveryBytesInUse prometheus.Gauge + recoveryIsFlushing prometheus.Gauge } // setRecoveryBytesInUse bounds the bytes reports to >= 0. @@ -107,5 +108,9 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { Name: "loki_ingester_wal_bytes_in_use", Help: "Total number of bytes in use by the WAL recovery process.", }), + recoveryIsFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_wal_replay_flushing", + Help: "Whether the wal replay is in a flushing phase due to backpressure", + }), } } diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go index 0b72a618ffcc5..72fe9d513acfa 100644 --- a/pkg/ingester/replay_controller.go +++ b/pkg/ingester/replay_controller.go @@ -3,6 +3,9 @@ package ingester import ( "sync" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/dustin/go-humanize" + "github.com/go-kit/kit/log/level" "go.uber.org/atomic" ) @@ -28,6 +31,7 @@ func (f *replayFlusher) Flush() { instance.streamsMtx.Unlock() } + } type Flusher interface { @@ -70,8 +74,23 @@ func (c *replayController) Cur() int { func (c *replayController) Flush() { if c.isFlushing.CAS(false, true) { + c.metrics.recoveryIsFlushing.Set(1) + prior := c.currentBytes.Load() + level.Debug(util_log.Logger).Log( + "msg", "replay flusher pre-flush", + "bytes", humanize.Bytes(uint64(prior)), + ) + c.flusher.Flush() + + after := c.currentBytes.Load() + level.Debug(util_log.Logger).Log( + "msg", "replay flusher post-flush", + "bytes", humanize.Bytes(uint64(after)), + ) + c.isFlushing.Store(false) + c.metrics.recoveryIsFlushing.Set(0) // Broadcast after lock is acquired to prevent race conditions with cpu scheduling // where the flush code could finish before the goroutine which initiated it gets to call