From c432464ccf3730f42df250f7237befe552c107e8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 19 Jan 2021 17:53:52 -0500 Subject: [PATCH 01/14] adds WAL replay backpressure --- pkg/chunkenc/memchunk.go | 2 +- pkg/ingester/checkpoint_test.go | 10 ++-- pkg/ingester/flush.go | 6 +++ pkg/ingester/ingester.go | 18 ++++--- pkg/ingester/instance.go | 2 +- pkg/ingester/metrics.go | 19 +++++++ pkg/ingester/recovery.go | 93 ++++++++++++++++++++------------- pkg/ingester/stream.go | 19 ++++--- pkg/ingester/stream_test.go | 10 ++-- pkg/ingester/wal.go | 18 +++++-- 10 files changed, 132 insertions(+), 65 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index d56408fcc0112..2b414315cbfb6 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -255,7 +255,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { blocks: []block{}, head: &headBlock{}, - format: chunkFormatV2, + format: chunkFormatV3, encoding: enc, } diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 2d2648682791e..9ceb61dc1a988 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -41,12 +41,10 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time, func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config { ingesterConfig := defaultIngesterTestConfig(t) ingesterConfig.MaxTransferRetries = 0 - ingesterConfig.WAL = WALConfig{ - Enabled: true, - Dir: walDir, - Recover: true, - CheckpointDuration: time.Second, - } + ingesterConfig.WAL.Enabled = true + ingesterConfig.WAL.Dir = walDir + ingesterConfig.WAL.Recover = true + ingesterConfig.WAL.CheckpointDuration = time.Second return ingesterConfig } diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e3f840d4f5bdd..42d4e724c9bbf 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -292,16 +292,22 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { stream.chunkMtx.Lock() defer stream.chunkMtx.Unlock() prevNumChunks := len(stream.chunks) + var subtracted int for len(stream.chunks) > 0 { if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod { break } + subtracted += stream.chunks[0].chunk.UncompressedSize() stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected stream.chunks = stream.chunks[1:] } memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks))) + // Signal how much data has been flushed to lessen any WAL replay pressure. + i.metrics.setRecoveryBytesInUse(i.currentReplayBytes.Sub(int64(subtracted))) + i.replayCond.Broadcast() + if len(stream.chunks) == 0 { delete(instance.streamsByFP, stream.fp) delete(instance.streams, stream.labelsString) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4d2fe2a6f9325..dc3c5551b896f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/chunkenc" @@ -150,6 +151,10 @@ type Ingester struct { // Currently only used by the WAL to signal when the disk is full. flushOnShutdownSwitch *OnceSwitch + // Only used by WAL & flusher to coordinate backpressure during replay. + currentReplayBytes atomic.Int64 + replayCond *sync.Cond + metrics *ingesterMetrics wal WAL @@ -183,6 +188,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid tailersQuit: make(chan struct{}), metrics: metrics, flushOnShutdownSwitch: &OnceSwitch{}, + replayCond: sync.NewCond(&sync.Mutex{}), } if cfg.WAL.Enabled { @@ -214,6 +220,12 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid } func (i *Ingester) starting(ctx context.Context) error { + i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) + for j := 0; j < i.cfg.ConcurrentFlushes; j++ { + i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) + go i.flushLoop(j) + } + if i.cfg.WAL.Recover { // Disable the in process stream limit checks while replaying the WAL i.limiter.Disable() @@ -274,12 +286,6 @@ func (i *Ingester) starting(ctx context.Context) error { } - i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) - for j := 0; j < i.cfg.ConcurrentFlushes; j++ { - i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) - go i.flushLoop(j) - } - // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done err := i.lifecycler.StartAsync(context.Background()) if err != nil { diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 61d9a62fd45c5..a2f83e4f5b993 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -164,7 +164,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { continue } - if err := stream.Push(ctx, s.Entries, record); err != nil { + if _, err := stream.Push(ctx, s.Entries, record); err != nil { appendErr = err continue } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index b907612c6f0cb..2942eed517f67 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -22,6 +22,17 @@ type ingesterMetrics struct { recoveredStreamsTotal prometheus.Counter recoveredChunksTotal prometheus.Counter recoveredEntriesTotal prometheus.Counter + recoveredBytesTotal prometheus.Counter + recoveryBytesInUse prometheus.Gauge +} + +// setRecoveryBytesInUse bounds the bytes reports to >= 0. +// TODO(owen-d): we can gain some efficiency by having the flusher never update this after recovery ends. +func (m *ingesterMetrics) setRecoveryBytesInUse(v int64) { + if v < 0 { + v = 0 + } + m.recoveryBytesInUse.Set(float64(v)) } const ( @@ -88,5 +99,13 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { Name: "loki_ingester_wal_recovered_entries_total", Help: "Total number of entries recovered from the WAL.", }), + recoveredBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_recovered_bytes_total", + Help: "Total number of bytes recovered from the WAL.", + }), + recoveryBytesInUse: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_wal_bytes_in_use", + Help: "Total number of bytes in use by the WAL recovery process.", + }), } } diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index fb2b355a687c7..fa8a0460753c1 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -1,7 +1,6 @@ package ingester import ( - "context" io "io" "runtime" "sync" @@ -12,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wal" + "golang.org/x/net/context" "github.com/grafana/loki/pkg/logproto" ) @@ -87,7 +87,6 @@ type Recoverer interface { Series(series *Series) error SetStream(userID string, series record.RefSeries) error Push(userID string, entries RefEntries) error - Close() Done() <-chan struct{} } @@ -95,7 +94,8 @@ type ingesterRecoverer struct { // basically map[userID]map[fingerprint]*stream users sync.Map ing *Ingester - done chan struct{} + + done chan struct{} } func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { @@ -108,32 +108,52 @@ func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { // Use all available cores func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) } +func (r *ingesterRecoverer) withBackPressure(fn func() error) error { + // Account for backpressure and wait until there's enough memory to continue replaying the WAL + r.ing.replayCond.L.Lock() + defer r.ing.replayCond.L.Unlock() + + // use 90% as a threshold since we'll be adding to it. + for r.ing.currentReplayBytes.Load() > int64(r.ing.cfg.WAL.ReplayMemoryCeiling)*9/10 { + r.ing.replayCond.Wait() + } + + return fn() +} + func (r *ingesterRecoverer) Series(series *Series) error { - inst := r.ing.getOrCreateInstance(series.UserID) + return r.withBackPressure(func() error { - // TODO(owen-d): create another fn to avoid unnecessary label type conversions. - stream, err := inst.getOrCreateStream(logproto.Stream{ - Labels: client.FromLabelAdaptersToLabels(series.Labels).String(), - }, true, nil) + inst := r.ing.getOrCreateInstance(series.UserID) - if err != nil { - return err - } + // TODO(owen-d): create another fn to avoid unnecessary label type conversions. + stream, err := inst.getOrCreateStream(logproto.Stream{ + Labels: client.FromLabelAdaptersToLabels(series.Labels).String(), + }, true, nil) - added, err := stream.setChunks(series.Chunks) - if err != nil { - return err - } - r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) - r.ing.metrics.recoveredEntriesTotal.Add(float64(added)) + if err != nil { + return err + } - // now store the stream in the recovery map under the fingerprint originally recorded - // as it's possible the newly mapped fingerprint is different. This is because the WAL records - // will use this original reference. - got, _ := r.users.LoadOrStore(series.UserID, &sync.Map{}) - streamsMap := got.(*sync.Map) - streamsMap.Store(series.Fingerprint, stream) - return nil + bytesAdded, entriesAdded, err := stream.setChunks(series.Chunks) + + if err != nil { + return err + } + r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) + r.ing.metrics.recoveredBytesTotal.Add(float64(bytesAdded)) + r.ing.metrics.recoveredEntriesTotal.Add(float64(entriesAdded)) + r.ing.metrics.setRecoveryBytesInUse(r.ing.currentReplayBytes.Add(int64(bytesAdded))) + + // now store the stream in the recovery map under the fingerprint originally recorded + // as it's possible the newly mapped fingerprint is different. This is because the WAL records + // will use this original reference. + got, _ := r.users.LoadOrStore(series.UserID, &sync.Map{}) + streamsMap := got.(*sync.Map) + streamsMap.Store(series.Fingerprint, stream) + + return nil + }) } // SetStream is responsible for setting the key path for userIDs -> fingerprints -> streams. @@ -170,19 +190,22 @@ func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) er } func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { - out, ok := r.users.Load(userID) - if !ok { - return errors.Errorf("user (%s) not set during WAL replay", userID) - } + return r.withBackPressure(func() error { + out, ok := r.users.Load(userID) + if !ok { + return errors.Errorf("user (%s) not set during WAL replay", userID) + } - s, ok := out.(*sync.Map).Load(entries.Ref) - if !ok { - return errors.Errorf("stream (%d) not set during WAL replay for user (%s)", entries.Ref, userID) - } + s, ok := out.(*sync.Map).Load(entries.Ref) + if !ok { + return errors.Errorf("stream (%d) not set during WAL replay for user (%s)", entries.Ref, userID) + } - // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) - _ = s.(*stream).Push(context.Background(), entries.Entries, nil) - return nil + // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) + bytesAdded, _ := s.(*stream).Push(context.Background(), entries.Entries, nil) + r.ing.metrics.setRecoveryBytesInUse(r.ing.currentReplayBytes.Add(int64(bytesAdded))) + return nil + }) } func (r *ingesterRecoverer) Close() { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 2c093ed8f9ad9..dde1c8b8e0c33 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -117,18 +117,19 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { } // setChunks is used during checkpoint recovery -func (s *stream) setChunks(chunks []Chunk) (entriesAdded int, err error) { +func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err error) { s.chunkMtx.Lock() defer s.chunkMtx.Unlock() chks, err := fromWireChunks(s.cfg, chunks) if err != nil { - return 0, err + return 0, 0, err } s.chunks = chks for _, c := range s.chunks { entriesAdded += c.chunk.Size() + bytesAdded += c.chunk.UncompressedSize() } - return entriesAdded, nil + return bytesAdded, entriesAdded, nil } func (s *stream) NewChunk() *chunkenc.MemChunk { @@ -139,9 +140,10 @@ func (s *stream) Push( ctx context.Context, entries []logproto.Entry, record *WALRecord, -) error { +) (int, error) { s.chunkMtx.Lock() defer s.chunkMtx.Unlock() + var bytesAdded int prevNumChunks := len(s.chunks) var lastChunkTimestamp time.Time if prevNumChunks == 0 { @@ -199,6 +201,9 @@ func (s *stream) Push( lastChunkTimestamp = entries[i].Timestamp s.lastLine.ts = lastChunkTimestamp s.lastLine.content = entries[i].Line + + // length of string plus + bytesAdded += len(entries[i].Line) } chunk.lastUpdated = time.Now() } @@ -264,15 +269,15 @@ func (s *stream) Push( fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries)) - return httpgrpc.Errorf(http.StatusBadRequest, buf.String()) + return bytesAdded, httpgrpc.Errorf(http.StatusBadRequest, buf.String()) } - return lastEntryWithErr.e + return bytesAdded, lastEntryWithErr.e } if len(s.chunks) != prevNumChunks { memoryChunks.Add(float64(len(s.chunks) - prevNumChunks)) } - return nil + return bytesAdded, nil } // Returns true, if chunk should be cut before adding new entry. This is done to make ingesters diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index a7a405e5764c5..793f6b4ec8645 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -44,7 +44,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { NilMetrics, ) - err := s.Push(context.Background(), []logproto.Entry{ + _, err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(int64(numLogs), 0), Line: "log"}, }, recordPool.GetRecord()) require.NoError(t, err) @@ -65,7 +65,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs) expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String()) - err = s.Push(context.Background(), newLines, recordPool.GetRecord()) + _, err = s.Push(context.Background(), newLines, recordPool.GetRecord()) require.Error(t, err) require.Equal(t, expectErr.Error(), err.Error()) }) @@ -82,7 +82,7 @@ func TestPushDeduplication(t *testing.T) { NilMetrics, ) - err := s.Push(context.Background(), []logproto.Entry{ + written, err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, @@ -91,6 +91,7 @@ func TestPushDeduplication(t *testing.T) { require.Len(t, s.chunks, 1) require.Equal(t, s.chunks[0].chunk.Size(), 2, "expected exact duplicate to be dropped and newer content with same timestamp to be appended") + require.Equal(t, len("test"+"newer, better test"), written) } func TestStreamIterator(t *testing.T) { @@ -164,7 +165,8 @@ func Benchmark_PushStream(b *testing.B) { for n := 0; n < b.N; n++ { rec := recordPool.GetRecord() - require.NoError(b, s.Push(ctx, e, rec)) + _, err := s.Push(ctx, e, rec) + require.NoError(b, err) recordPool.PutRecord(rec) } } diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 0cd7fd328d9b5..0a559b27336b0 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -2,6 +2,7 @@ package ingester import ( "flag" + fmt "fmt" "sync" "time" @@ -12,6 +13,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wal" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util/flagext" ) var ( @@ -20,13 +22,15 @@ var ( ) const walSegmentSize = wal.DefaultSegmentSize * 4 +const defaultCeiling = 8 << 30 // 8GB type WALConfig struct { - Enabled bool `yaml:"enabled"` - Dir string `yaml:"dir"` - Recover bool `yaml:"recover"` - CheckpointDuration time.Duration `yaml:"checkpoint_duration"` - FlushOnShutdown bool `yaml:"flush_on_shutdown"` + Enabled bool `yaml:"enabled"` + Dir string `yaml:"dir"` + Recover bool `yaml:"recover"` + CheckpointDuration time.Duration `yaml:"checkpoint_duration"` + FlushOnShutdown bool `yaml:"flush_on_shutdown"` + ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"` } func (cfg *WALConfig) Validate() error { @@ -43,6 +47,10 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.") f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.") f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.") + + // Need to set default here + cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling) + f.Var(&cfg.ReplayMemoryCeiling, "ingester.wal-replay-memory-ceiling", fmt.Sprintf("How much memory the WAL may use during replay before it needs to flush chunks to storage, i.e. 10GB. Defaults to %s.", flagext.ByteSize(defaultCeiling).String())) } // WAL interface allows us to have a no-op WAL when the WAL is disabled. From 5bc9e5dbb86816a64626f5fac04f7bacc26613ae Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 20 Jan 2021 09:54:13 -0500 Subject: [PATCH 02/14] backpressure fn doesnt lock during passed fn for concurrency --- pkg/ingester/recovery.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index fa8a0460753c1..75890e378c2a6 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -111,13 +111,16 @@ func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) } func (r *ingesterRecoverer) withBackPressure(fn func() error) error { // Account for backpressure and wait until there's enough memory to continue replaying the WAL r.ing.replayCond.L.Lock() - defer r.ing.replayCond.L.Unlock() // use 90% as a threshold since we'll be adding to it. for r.ing.currentReplayBytes.Load() > int64(r.ing.cfg.WAL.ReplayMemoryCeiling)*9/10 { r.ing.replayCond.Wait() } + // Don't hold the lock while executing the provided function. + // This ensures we can run functions concurrently. + r.ing.replayCond.L.Unlock() + return fn() } From a56efaa126db357b1266fdfa889577c6e8de26dd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 21 Jan 2021 16:52:41 -0500 Subject: [PATCH 03/14] replayController --- pkg/ingester/replay_controller.go | 71 ++++++++++++++++++++ pkg/ingester/replay_controller_test.go | 90 ++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 pkg/ingester/replay_controller.go create mode 100644 pkg/ingester/replay_controller_test.go diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go new file mode 100644 index 0000000000000..85dbc7510ae8d --- /dev/null +++ b/pkg/ingester/replay_controller.go @@ -0,0 +1,71 @@ +package ingester + +import ( + "sync" + + "go.uber.org/atomic" +) + +type Flusher interface { + InitFlushQueues() + Flush() +} + +// replayController handles coordinating backpressure between WAL replays and chunk flushing. +type replayController struct { + cfg WALConfig + metrics *ingesterMetrics + currentBytes atomic.Int64 + cond *sync.Cond + isFlushing atomic.Bool + flusher Flusher +} + +// flusher is expected to reduce pressure via calling Sub +func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flusher) *replayController { + return &replayController{ + cfg: cfg, + metrics: metrics, + cond: sync.NewCond(&sync.Mutex{}), + flusher: flusher, + } +} + +func (c *replayController) Add(x int64) { + c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(int64(x))) +} + +func (c *replayController) Sub(x int64) { + c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(int64(x))) +} + +func (c *replayController) Cur() int { + return int(c.currentBytes.Load()) +} + +func (c *replayController) Flush() { + if c.isFlushing.CAS(false, true) { + c.flusher.InitFlushQueues() + c.flusher.Flush() + c.isFlushing.Store(false) + c.cond.Broadcast() + } +} + +func (c *replayController) WithBackPressure(fn func() error) error { + // Account for backpressure and wait until there's enough memory to continue replaying the WAL + c.cond.L.Lock() + + // use 90% as a threshold since we'll be adding to it. + for c.Cur() > int(c.cfg.ReplayMemoryCeiling)*9/10 { + // too much backpressure, flush + go c.Flush() + c.cond.Wait() + } + + // Don't hold the lock while executing the provided function. + // This ensures we can run functions concurrently. + c.cond.L.Unlock() + + return fn() +} diff --git a/pkg/ingester/replay_controller_test.go b/pkg/ingester/replay_controller_test.go new file mode 100644 index 0000000000000..0392c8d088549 --- /dev/null +++ b/pkg/ingester/replay_controller_test.go @@ -0,0 +1,90 @@ +package ingester + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type dumbFlusher struct { + onInit, onFlush func() +} + +func newDumbFlusher(onInit, onFlush func()) *dumbFlusher { + return &dumbFlusher{ + onInit: onInit, + onFlush: onFlush, + } +} + +func (f *dumbFlusher) InitFlushQueues() { + if f.onInit != nil { + f.onInit() + } +} +func (f *dumbFlusher) Flush() { + if f.onFlush != nil { + f.onFlush() + } +} + +func nilMetrics() *ingesterMetrics { return newIngesterMetrics(nil) } + +func TestReplayController(t *testing.T) { + var ops []string + var opLock sync.Mutex + + var rc *replayController + flusher := newDumbFlusher( + func() { + opLock.Lock() + defer opLock.Unlock() + ops = append(ops, "InitFlushQueues") + }, + func() { + rc.Sub(100) // simulate flushing 100 bytes + opLock.Lock() + defer opLock.Unlock() + ops = append(ops, "Flush") + }, + ) + rc = newReplayController(nilMetrics(), WALConfig{ReplayMemoryCeiling: 100}, flusher) + + var wg sync.WaitGroup + n := 5 + wg.Add(n) + + for i := 0; i < n; i++ { + // In order to prevent all the goroutines from running before they've added bytes + // to the internal count, introduce a brief sleep. + time.Sleep(time.Millisecond) + + // nolint:errcheck + go rc.WithBackPressure(func() error { + rc.Add(50) + opLock.Lock() + defer opLock.Unlock() + ops = append(ops, "WithBackPressure") + wg.Done() + return nil + }) + } + + wg.Wait() + + expected := []string{ + "WithBackPressure", // add 50, total 50 + "WithBackPressure", // add 50, total 100 + "InitFlushQueues", + "Flush", // subtract 100, total 0 + "WithBackPressure", // add 50, total 50 + "WithBackPressure", // add 50, total 100 + "InitFlushQueues", + "Flush", // subtract 100, total 0 + "WithBackPressure", // add 50, total 50 + } + require.Equal(t, expected, ops) + +} From 3e94053ded7a44d6248225ca8376f913b2897c86 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 21 Jan 2021 18:38:03 -0500 Subject: [PATCH 04/14] replayController supports a RemoveFlushedChunks fn --- pkg/ingester/replay_controller.go | 12 ++++++++++++ pkg/ingester/replay_controller_test.go | 25 +++++++++++++++++++------ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go index 85dbc7510ae8d..f2e5a4ba5096b 100644 --- a/pkg/ingester/replay_controller.go +++ b/pkg/ingester/replay_controller.go @@ -1,6 +1,7 @@ package ingester import ( + fmt "fmt" "sync" "go.uber.org/atomic" @@ -9,6 +10,7 @@ import ( type Flusher interface { InitFlushQueues() Flush() + RemoveFlushedChunks() } // replayController handles coordinating backpressure between WAL replays and chunk flushing. @@ -37,6 +39,7 @@ func (c *replayController) Add(x int64) { func (c *replayController) Sub(x int64) { c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(int64(x))) + } func (c *replayController) Cur() int { @@ -45,14 +48,23 @@ func (c *replayController) Cur() int { func (c *replayController) Flush() { if c.isFlushing.CAS(false, true) { + fmt.Println("flushing, before ", c.Cur()) c.flusher.InitFlushQueues() c.flusher.Flush() + c.flusher.RemoveFlushedChunks() + fmt.Println("flushing, after ", c.Cur()) c.isFlushing.Store(false) c.cond.Broadcast() } } +// WithBackPressure is expected to call replayController.Add in the passed function to increase the managed byte count. +// It will call the function as long as there is expected room before the memory cap and will then flush data intermittently +// when needed. func (c *replayController) WithBackPressure(fn func() error) error { + defer func() { + fmt.Println("hit, cur", c.Cur()) + }() // Account for backpressure and wait until there's enough memory to continue replaying the WAL c.cond.L.Lock() diff --git a/pkg/ingester/replay_controller_test.go b/pkg/ingester/replay_controller_test.go index 0392c8d088549..23fba728f9908 100644 --- a/pkg/ingester/replay_controller_test.go +++ b/pkg/ingester/replay_controller_test.go @@ -9,13 +9,14 @@ import ( ) type dumbFlusher struct { - onInit, onFlush func() + onInit, onFlush, postFlush func() } -func newDumbFlusher(onInit, onFlush func()) *dumbFlusher { +func newDumbFlusher(onInit, onFlush, postFlush func()) *dumbFlusher { return &dumbFlusher{ - onInit: onInit, - onFlush: onFlush, + onInit: onInit, + onFlush: onFlush, + postFlush: postFlush, } } @@ -29,6 +30,11 @@ func (f *dumbFlusher) Flush() { f.onFlush() } } +func (f *dumbFlusher) RemoveFlushedChunks() { + if f.postFlush != nil { + f.postFlush() + } +} func nilMetrics() *ingesterMetrics { return newIngesterMetrics(nil) } @@ -49,6 +55,11 @@ func TestReplayController(t *testing.T) { defer opLock.Unlock() ops = append(ops, "Flush") }, + func() { + opLock.Lock() + defer opLock.Unlock() + ops = append(ops, "PostFlush") + }, ) rc = newReplayController(nilMetrics(), WALConfig{ReplayMemoryCeiling: 100}, flusher) @@ -78,11 +89,13 @@ func TestReplayController(t *testing.T) { "WithBackPressure", // add 50, total 50 "WithBackPressure", // add 50, total 100 "InitFlushQueues", - "Flush", // subtract 100, total 0 + "Flush", // subtract 100, total 0 + "PostFlush", "WithBackPressure", // add 50, total 50 "WithBackPressure", // add 50, total 100 "InitFlushQueues", - "Flush", // subtract 100, total 0 + "Flush", // subtract 100, total 0 + "PostFlush", "WithBackPressure", // add 50, total 50 } require.Equal(t, expected, ops) From 12b81efc7fce285970a71f011436c0350016ff26 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 21 Jan 2021 19:35:06 -0500 Subject: [PATCH 05/14] custom wal flusher --- pkg/ingester/checkpoint_test.go | 123 +++++++++++++++++++++++++ pkg/ingester/flush.go | 34 +++++-- pkg/ingester/ingester.go | 22 +++-- pkg/ingester/recovery.go | 24 +---- pkg/ingester/replay_controller.go | 34 +++++-- pkg/ingester/replay_controller_test.go | 34 +------ 6 files changed, 191 insertions(+), 80 deletions(-) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 9ceb61dc1a988..8eb5dfd6f2da4 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -225,6 +225,97 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } +func TestIngesterWALBackpressureSegments(t *testing.T) { + + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + ingesterConfig.WAL.ReplayMemoryCeiling = 1000 + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + start := time.Now() + // Replay data 5x larger than the ceiling. + totalSize := int(5 * i.cfg.WAL.ReplayMemoryCeiling) + req, written := mkPush(start, totalSize) + require.Equal(t, totalSize, written) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, req) + require.NoError(t, err) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // ensure we haven't checkpointed yet + expectCheckpoint(t, walDir, false) + + // restart the ingester, ensuring we replayed from WAL. + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) +} + +func TestIngesterWALBackpressureCheckpoint(t *testing.T) { + + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + ingesterConfig.WAL.ReplayMemoryCeiling = 1000 + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + start := time.Now() + // Replay data 5x larger than the ceiling. + totalSize := int(5 * i.cfg.WAL.ReplayMemoryCeiling) + req, written := mkPush(start, totalSize) + require.Equal(t, totalSize, written) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, req) + require.NoError(t, err) + + time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer + // ensure we have checkpointed now + expectCheckpoint(t, walDir, true) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // restart the ingester, ensuring we can replay from the checkpoint as well. + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) +} + func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { fs, err := ioutil.ReadDir(walDir) require.Nil(t, err) @@ -237,3 +328,35 @@ func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { require.True(t, found == shouldExist) } + +// mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams +func mkPush(start time.Time, totalSize int) (*logproto.PushRequest, int) { + var written int + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + }, + } + totalStreams := 500 + if totalStreams > totalSize { + totalStreams = totalSize + } + + for i := 0; i < totalStreams; i++ { + req.Streams = append(req.Streams, logproto.Stream{ + Labels: fmt.Sprintf(`{foo="bar",i="%d"}`, i), + }) + + for j := 0; j < totalSize/totalStreams; j++ { + req.Streams[i].Entries = append(req.Streams[i].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(j) * time.Nanosecond), + Line: string([]byte{1}), + }) + written++ + } + + } + return req, written +} diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 42d4e724c9bbf..ffdabd14f6841 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -106,10 +106,24 @@ const ( flushReasonSynced = "synced" ) +// Note: this is called both during the WAL replay (zero or more times) +// and then after replay as well. +func (i *Ingester) InitFlushQueues() { + i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) + for j := 0; j < i.cfg.ConcurrentFlushes; j++ { + i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) + go i.flushLoop(j) + } +} + // Flush triggers a flush of all the chunks and closes the flush queues. // Called from the Lifecycler as part of the ingester shutdown. func (i *Ingester) Flush() { - i.sweepUsers(true) + i.flush(true) +} + +func (i *Ingester) flush(mayRemoveStreams bool) { + i.sweepUsers(true, mayRemoveStreams) // Close the flush queues, to unblock waiting workers. for _, flushQueue := range i.flushQueues { @@ -117,12 +131,13 @@ func (i *Ingester) Flush() { } i.flushQueuesDone.Wait() + } // FlushHandler triggers a flush of all in memory chunks. Mainly used for // local testing. func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { - i.sweepUsers(true) + i.sweepUsers(true, true) w.WriteHeader(http.StatusNoContent) } @@ -142,21 +157,21 @@ func (o *flushOp) Priority() int64 { } // sweepUsers periodically schedules series for flushing and garbage collects users with no series -func (i *Ingester) sweepUsers(immediate bool) { +func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) { instances := i.getInstances() for _, instance := range instances { - i.sweepInstance(instance, immediate) + i.sweepInstance(instance, immediate, mayRemoveStreams) } } -func (i *Ingester) sweepInstance(instance *instance, immediate bool) { +func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) { instance.streamsMtx.Lock() defer instance.streamsMtx.Unlock() for _, stream := range instance.streams { i.sweepStream(instance, stream, immediate) - i.removeFlushedChunks(instance, stream) + i.removeFlushedChunks(instance, stream, mayRemoveStreams) } } @@ -286,7 +301,7 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { } // must hold streamsMtx -func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { +func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) { now := time.Now() stream.chunkMtx.Lock() @@ -305,10 +320,9 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks))) // Signal how much data has been flushed to lessen any WAL replay pressure. - i.metrics.setRecoveryBytesInUse(i.currentReplayBytes.Sub(int64(subtracted))) - i.replayCond.Broadcast() + i.replayController.Sub(int64(subtracted)) - if len(stream.chunks) == 0 { + if mayRemoveStream && len(stream.chunks) == 0 { delete(instance.streamsByFP, stream.fp) delete(instance.streams, stream.labelsString) instance.index.Delete(stream.labels, stream.fp) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dc3c5551b896f..4c0d9f56c02de 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" - "go.uber.org/atomic" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/chunkenc" @@ -152,8 +151,7 @@ type Ingester struct { flushOnShutdownSwitch *OnceSwitch // Only used by WAL & flusher to coordinate backpressure during replay. - currentReplayBytes atomic.Int64 - replayCond *sync.Cond + replayController *replayController metrics *ingesterMetrics @@ -188,8 +186,8 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid tailersQuit: make(chan struct{}), metrics: metrics, flushOnShutdownSwitch: &OnceSwitch{}, - replayCond: sync.NewCond(&sync.Mutex{}), } + i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) if cfg.WAL.Enabled { if err := os.MkdirAll(cfg.WAL.Dir, os.ModePerm); err != nil { @@ -220,13 +218,15 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid } func (i *Ingester) starting(ctx context.Context) error { - i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) - for j := 0; j < i.cfg.ConcurrentFlushes; j++ { - i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) - go i.flushLoop(j) - } if i.cfg.WAL.Recover { + // Ignore retain period during wal replay. + old := i.cfg.RetainPeriod + i.cfg.RetainPeriod = 0 + defer func() { + i.cfg.RetainPeriod = old + }() + // Disable the in process stream limit checks while replaying the WAL i.limiter.Disable() defer i.limiter.Enable() @@ -286,6 +286,8 @@ func (i *Ingester) starting(ctx context.Context) error { } + i.InitFlushQueues() + // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done err := i.lifecycler.StartAsync(context.Background()) if err != nil { @@ -355,7 +357,7 @@ func (i *Ingester) loop() { for { select { case <-flushTicker.C: - i.sweepUsers(false) + i.sweepUsers(false, true) case <-i.loopQuit: return diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 75890e378c2a6..11276dc5fc2d2 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -108,24 +108,8 @@ func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { // Use all available cores func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) } -func (r *ingesterRecoverer) withBackPressure(fn func() error) error { - // Account for backpressure and wait until there's enough memory to continue replaying the WAL - r.ing.replayCond.L.Lock() - - // use 90% as a threshold since we'll be adding to it. - for r.ing.currentReplayBytes.Load() > int64(r.ing.cfg.WAL.ReplayMemoryCeiling)*9/10 { - r.ing.replayCond.Wait() - } - - // Don't hold the lock while executing the provided function. - // This ensures we can run functions concurrently. - r.ing.replayCond.L.Unlock() - - return fn() -} - func (r *ingesterRecoverer) Series(series *Series) error { - return r.withBackPressure(func() error { + return r.ing.replayController.WithBackPressure(func() error { inst := r.ing.getOrCreateInstance(series.UserID) @@ -146,7 +130,7 @@ func (r *ingesterRecoverer) Series(series *Series) error { r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) r.ing.metrics.recoveredBytesTotal.Add(float64(bytesAdded)) r.ing.metrics.recoveredEntriesTotal.Add(float64(entriesAdded)) - r.ing.metrics.setRecoveryBytesInUse(r.ing.currentReplayBytes.Add(int64(bytesAdded))) + r.ing.replayController.Add(int64(bytesAdded)) // now store the stream in the recovery map under the fingerprint originally recorded // as it's possible the newly mapped fingerprint is different. This is because the WAL records @@ -193,7 +177,7 @@ func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) er } func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { - return r.withBackPressure(func() error { + return r.ing.replayController.WithBackPressure(func() error { out, ok := r.users.Load(userID) if !ok { return errors.Errorf("user (%s) not set during WAL replay", userID) @@ -206,7 +190,7 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) bytesAdded, _ := s.(*stream).Push(context.Background(), entries.Entries, nil) - r.ing.metrics.setRecoveryBytesInUse(r.ing.currentReplayBytes.Add(int64(bytesAdded))) + r.ing.replayController.Add(int64(bytesAdded)) return nil }) } diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go index f2e5a4ba5096b..20909f03a9317 100644 --- a/pkg/ingester/replay_controller.go +++ b/pkg/ingester/replay_controller.go @@ -1,16 +1,37 @@ package ingester import ( - fmt "fmt" "sync" "go.uber.org/atomic" ) +type replayFlusher struct { + i *Ingester +} + +func (f *replayFlusher) Flush() { + f.i.InitFlushQueues() + f.i.flush(false) // flush data but don't remove streams from the ingesters + + // Similar to sweepUsers with the exception that it will not remove streams + // afterwards to prevent unlinking a stream which may receive later writes from the WAL. + // We have to do this here after the flushQueues have been drained. + instances := f.i.getInstances() + + for _, instance := range instances { + instance.streamsMtx.Lock() + + for _, stream := range instance.streams { + f.i.removeFlushedChunks(instance, stream, false) + } + + instance.streamsMtx.Unlock() + } +} + type Flusher interface { - InitFlushQueues() Flush() - RemoveFlushedChunks() } // replayController handles coordinating backpressure between WAL replays and chunk flushing. @@ -48,11 +69,7 @@ func (c *replayController) Cur() int { func (c *replayController) Flush() { if c.isFlushing.CAS(false, true) { - fmt.Println("flushing, before ", c.Cur()) - c.flusher.InitFlushQueues() c.flusher.Flush() - c.flusher.RemoveFlushedChunks() - fmt.Println("flushing, after ", c.Cur()) c.isFlushing.Store(false) c.cond.Broadcast() } @@ -62,9 +79,6 @@ func (c *replayController) Flush() { // It will call the function as long as there is expected room before the memory cap and will then flush data intermittently // when needed. func (c *replayController) WithBackPressure(fn func() error) error { - defer func() { - fmt.Println("hit, cur", c.Cur()) - }() // Account for backpressure and wait until there's enough memory to continue replaying the WAL c.cond.L.Lock() diff --git a/pkg/ingester/replay_controller_test.go b/pkg/ingester/replay_controller_test.go index 23fba728f9908..d979028d2a4e8 100644 --- a/pkg/ingester/replay_controller_test.go +++ b/pkg/ingester/replay_controller_test.go @@ -12,29 +12,17 @@ type dumbFlusher struct { onInit, onFlush, postFlush func() } -func newDumbFlusher(onInit, onFlush, postFlush func()) *dumbFlusher { +func newDumbFlusher(onFlush func()) *dumbFlusher { return &dumbFlusher{ - onInit: onInit, - onFlush: onFlush, - postFlush: postFlush, + onFlush: onFlush, } } -func (f *dumbFlusher) InitFlushQueues() { - if f.onInit != nil { - f.onInit() - } -} func (f *dumbFlusher) Flush() { if f.onFlush != nil { f.onFlush() } } -func (f *dumbFlusher) RemoveFlushedChunks() { - if f.postFlush != nil { - f.postFlush() - } -} func nilMetrics() *ingesterMetrics { return newIngesterMetrics(nil) } @@ -44,22 +32,12 @@ func TestReplayController(t *testing.T) { var rc *replayController flusher := newDumbFlusher( - func() { - opLock.Lock() - defer opLock.Unlock() - ops = append(ops, "InitFlushQueues") - }, func() { rc.Sub(100) // simulate flushing 100 bytes opLock.Lock() defer opLock.Unlock() ops = append(ops, "Flush") }, - func() { - opLock.Lock() - defer opLock.Unlock() - ops = append(ops, "PostFlush") - }, ) rc = newReplayController(nilMetrics(), WALConfig{ReplayMemoryCeiling: 100}, flusher) @@ -88,14 +66,10 @@ func TestReplayController(t *testing.T) { expected := []string{ "WithBackPressure", // add 50, total 50 "WithBackPressure", // add 50, total 100 - "InitFlushQueues", - "Flush", // subtract 100, total 0 - "PostFlush", + "Flush", // subtract 100, total 0 "WithBackPressure", // add 50, total 50 "WithBackPressure", // add 50, total 100 - "InitFlushQueues", - "Flush", // subtract 100, total 0 - "PostFlush", + "Flush", // subtract 100, total 0 "WithBackPressure", // add 50, total 50 } require.Equal(t, expected, ops) From 041a88da5d1f5118d58416e88d4ccc9af7bc15d6 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Jan 2021 09:32:54 -0500 Subject: [PATCH 06/14] encoding test determinism --- pkg/ingester/encoding_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 4762873657c68..2b122345a5349 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -42,6 +42,11 @@ func Test_Encoding_Series(t *testing.T) { err := decodeWALRecord(buf, decoded) require.Nil(t, err) + + // Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices. + // Both are valid, so check length. + require.Equal(t, 0, len(decoded.RefEntries)) + decoded.RefEntries = nil require.Equal(t, record, decoded) } From 6543f3b7d01342490d5201664668793b858e79dc Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Jan 2021 09:38:03 -0500 Subject: [PATCH 07/14] standardizes recoveredBytesTotal to replayControllers ownership --- pkg/ingester/recovery.go | 1 - pkg/ingester/replay_controller.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 11276dc5fc2d2..ef5ef062c96d4 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -128,7 +128,6 @@ func (r *ingesterRecoverer) Series(series *Series) error { return err } r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) - r.ing.metrics.recoveredBytesTotal.Add(float64(bytesAdded)) r.ing.metrics.recoveredEntriesTotal.Add(float64(entriesAdded)) r.ing.replayController.Add(int64(bytesAdded)) diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go index 20909f03a9317..89d62a65337d8 100644 --- a/pkg/ingester/replay_controller.go +++ b/pkg/ingester/replay_controller.go @@ -55,6 +55,7 @@ func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flushe } func (c *replayController) Add(x int64) { + c.metrics.recoveredBytesTotal.Add(float64(x)) c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(int64(x))) } From 37d079737cfdb011ca0260db1a26159e57c80225 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Jan 2021 09:55:47 -0500 Subject: [PATCH 08/14] linting --- pkg/ingester/replay_controller.go | 4 ++-- pkg/ingester/replay_controller_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go index 89d62a65337d8..af8d294bf2fd3 100644 --- a/pkg/ingester/replay_controller.go +++ b/pkg/ingester/replay_controller.go @@ -56,11 +56,11 @@ func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flushe func (c *replayController) Add(x int64) { c.metrics.recoveredBytesTotal.Add(float64(x)) - c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(int64(x))) + c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(x)) } func (c *replayController) Sub(x int64) { - c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(int64(x))) + c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(x)) } diff --git a/pkg/ingester/replay_controller_test.go b/pkg/ingester/replay_controller_test.go index d979028d2a4e8..b4e1b81af9e1e 100644 --- a/pkg/ingester/replay_controller_test.go +++ b/pkg/ingester/replay_controller_test.go @@ -9,7 +9,7 @@ import ( ) type dumbFlusher struct { - onInit, onFlush, postFlush func() + onFlush func() } func newDumbFlusher(onFlush func()) *dumbFlusher { @@ -50,7 +50,7 @@ func TestReplayController(t *testing.T) { // to the internal count, introduce a brief sleep. time.Sleep(time.Millisecond) - // nolint:errcheck + // nolint:errcheck,unparam go rc.WithBackPressure(func() error { rc.Add(50) opLock.Lock() From 94aa10fc209931db3775e9b06fba5908bbb94938 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Jan 2021 11:06:40 -0500 Subject: [PATCH 09/14] default wal memory threshold to 4gb, adds docs --- docs/sources/configuration/_index.md | 18 +++++++++++------- docs/sources/operations/storage/wal.md | 8 +++++++- pkg/ingester/wal.go | 2 +- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index d0e0ff7810261..4d55ec6e67e15 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -897,7 +897,7 @@ lifecycler: [query_store_max_look_back_period: | default = 0] -# The ingester WAL records incoming logs and stores them on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. +# The ingester WAL (Write Ahead Log) records incoming logs and stores them on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. wal: # Enables writing to WAL. # CLI flag: -ingester.wal-enabled @@ -911,13 +911,17 @@ wal: # CLI flag: -ingester.recover-from-wal [recover: | default = false] -# When WAL is enabled, should chunks be flushed to long-term storage on shutdown. -# CLI flag: -ingester.flush-on-shutdown -[flush_on_shutdown: | default = false] + # When WAL is enabled, should chunks be flushed to long-term storage on shutdown. + # CLI flag: -ingester.flush-on-shutdown + [flush_on_shutdown: | default = false] -# Interval at which checkpoints should be created. -# CLI flag: ingester.checkpoint-duration -[checkpoint_duration: | default = 5m] + # Interval at which checkpoints should be created. + # CLI flag: ingester.checkpoint-duration + [checkpoint_duration: | default = 5m] + + # Maximum memory size the WAL may use during replay. After hitting this it will flush data to storage before continuing. + # A unit suffix (KB, MB, GB) may be applied. + [replay_memory_ceiling: | default = 4GB] ``` ## consul_config diff --git a/docs/sources/operations/storage/wal.md b/docs/sources/operations/storage/wal.md index a5c1e99db99d6..93f5df18d14d4 100644 --- a/docs/sources/operations/storage/wal.md +++ b/docs/sources/operations/storage/wal.md @@ -26,6 +26,11 @@ In the event the underlying WAL disk is full, Loki will not fail incoming writes Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens. + +### Backpressure + +The WAL also includes a backpressure mechanism to allow a large WAL to be replayed within a smaller memory bound. This is helpful after bad scenarios (i.e. an outage) when a WAL has grown past the point it may be recovered in memory. In this case, the ingester will track the amount of data being replayed and once it's passed the `ingester.wal-replay-memory-ceiling` threshold, will flush to storage. When this happens, it's likely that Loki's attempt to deduplicate chunks via content addressable storage will suffer. We deemed this efficiency loss an acceptable tradeoff considering how it simplifies operation and that it should not occur during regular operation (rollouts, rescheduling) where the WAL can be replayed without triggering this threshold. + ### Metrics ## Changes to deployment @@ -38,6 +43,7 @@ Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be * `--ingester.checkpoint-duration` to the interval at which checkpoints should be created. * `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. * If you are going to enable WAL, it is advisable to always set this to `true`. + * `--ingester.wal-replay-memory-ceiling` (default 4GB) may be set higher/lower depending on your resource settings. It handles memory pressure during WAL replays, allowing a WAL many times larger than available memory to be replayed. This is provided to minimize reconciliation time after very bad situations, i.e. an outage, and will likely not impact regular operations/rollouts _at all_. We suggest setting this to a high percentage (~75%) of available memory. ## Changes in lifecycle when WAL is enabled @@ -78,7 +84,7 @@ When scaling down, we must ensure existing data on the leaving ingesters are flu Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`. -Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/flush_shutdown`](../../api#post-ingesterflush_shutdown) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring). +Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/flush_shutdown`](../../api#post-ingesterflush_shutdown) endpoint. This will flush the chunks and remove itself from the ring, after which it will register as unready and may be deleted. After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2. diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 0a559b27336b0..e788dc34bce93 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -22,7 +22,7 @@ var ( ) const walSegmentSize = wal.DefaultSegmentSize * 4 -const defaultCeiling = 8 << 30 // 8GB +const defaultCeiling = 4 << 30 // 4GB type WALConfig struct { Enabled bool `yaml:"enabled"` From 245e597fabdcbe262c62921ae8f93fab66ea9ebf Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Jan 2021 11:08:07 -0500 Subject: [PATCH 10/14] fix test after v3 chunk schema change --- pkg/chunkenc/memchunk_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index ce63a3cbea05d..eb23085408bb5 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -920,13 +920,6 @@ func TestCheckpointEncoding(t *testing.T) { cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize) require.Nil(t, err) - // TODO(owen-d): remove once v3+ is the default chunk version - // because that is when we started serializing uncompressed size. - // Until then, nil them out in order to ease equality testing. - for i := range c.blocks { - c.blocks[i].uncompressedSize = 0 - } - require.Equal(t, c, cpy) } From 1b981ffad68072e3e553e8e8d5a8e22978fe8ee1 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 22 Jan 2021 11:45:02 -0500 Subject: [PATCH 11/14] more lenient expectCheckpoint function --- pkg/ingester/checkpoint_test.go | 38 +++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index c70b4649a762a..5a6f504c411b7 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -111,7 +111,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // ensure we haven't checkpointed yet - expectCheckpoint(t, walDir, false) + expectCheckpoint(t, walDir, false, time.Second) // restart the ingester i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) @@ -122,9 +122,8 @@ func TestIngesterWAL(t *testing.T) { // ensure we've recovered data from wal segments ensureIngesterData(ctx, t, start, end, i) - time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer // ensure we have checkpointed now - expectCheckpoint(t, walDir, true) + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) @@ -278,7 +277,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // ensure we haven't checkpointed yet - expectCheckpoint(t, walDir, false) + expectCheckpoint(t, walDir, false, time.Second) // restart the ingester, ensuring we replayed from WAL. i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) @@ -320,9 +319,8 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { _, err = i.Push(ctx, req) require.NoError(t, err) - time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer // ensure we have checkpointed now - expectCheckpoint(t, walDir, true) + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) @@ -333,17 +331,29 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) } -func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { - fs, err := ioutil.ReadDir(walDir) - require.Nil(t, err) - var found bool - for _, f := range fs { - if _, err := checkpointIndex(f.Name(), false); err == nil { - found = true +func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Duration) { + deadline := time.After(max) + for { + select { + case <-deadline: + require.Fail(t, "timeout while waiting for checkpoint existence:", shouldExist) + default: + <-time.After(max / 10) // check 10x over the duration + } + + fs, err := ioutil.ReadDir(walDir) + require.Nil(t, err) + var found bool + for _, f := range fs { + if _, err := checkpointIndex(f.Name(), false); err == nil { + found = true + } + } + if found == shouldExist { + return } } - require.True(t, found == shouldExist) } // mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams From b7de9eff05ded83c596ac2cd20d530164b167e7e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 26 Jan 2021 10:36:26 -0500 Subject: [PATCH 12/14] replayController protects against flush race condition --- pkg/ingester/replay_controller.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go index af8d294bf2fd3..0b72a618ffcc5 100644 --- a/pkg/ingester/replay_controller.go +++ b/pkg/ingester/replay_controller.go @@ -72,7 +72,13 @@ func (c *replayController) Flush() { if c.isFlushing.CAS(false, true) { c.flusher.Flush() c.isFlushing.Store(false) + + // Broadcast after lock is acquired to prevent race conditions with cpu scheduling + // where the flush code could finish before the goroutine which initiated it gets to call + // c.cond.Wait() + c.cond.L.Lock() c.cond.Broadcast() + c.cond.L.Unlock() } } From 1ee8cdfbb73c806ce25580894fa49b95c5afe24d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 27 Jan 2021 13:26:28 -0500 Subject: [PATCH 13/14] adds replay_memory_ceiling to wal jsonnet --- production/ksonnet/loki/wal.libsonnet | 1 + 1 file changed, 1 insertion(+) diff --git a/production/ksonnet/loki/wal.libsonnet b/production/ksonnet/loki/wal.libsonnet index 695cbcf9384a9..d3188c68175a0 100644 --- a/production/ksonnet/loki/wal.libsonnet +++ b/production/ksonnet/loki/wal.libsonnet @@ -13,6 +13,7 @@ enabled: true, dir: '/loki/wal', recover: true, + replay_memory_ceiling: '9GB', // between the requests & limits }, }, }), From 78a44bf319c473aa3cd64aa73cc797ff86920245 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 27 Jan 2021 15:44:30 -0500 Subject: [PATCH 14/14] replay ceiling help msg --- pkg/ingester/wal.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index e788dc34bce93..59bd6037a156b 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -2,7 +2,6 @@ package ingester import ( "flag" - fmt "fmt" "sync" "time" @@ -50,7 +49,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { // Need to set default here cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling) - f.Var(&cfg.ReplayMemoryCeiling, "ingester.wal-replay-memory-ceiling", fmt.Sprintf("How much memory the WAL may use during replay before it needs to flush chunks to storage, i.e. 10GB. Defaults to %s.", flagext.ByteSize(defaultCeiling).String())) + f.Var(&cfg.ReplayMemoryCeiling, "ingester.wal-replay-memory-ceiling", "How much memory the WAL may use during replay before it needs to flush chunks to storage, i.e. 10GB. We suggest setting this to a high percentage (~75%) of available memory.") } // WAL interface allows us to have a no-op WAL when the WAL is disabled.