From 72e0775b6c4a097bfdf8558bea69d030ffa0c753 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 12 Jun 2024 09:17:41 +0200 Subject: [PATCH] [r294] Update to mimir-prometheus w/ PostingsForMatchersCache context cancellation and MemPostings fix Signed-off-by: Marco Pracucci --- go.mod | 2 +- go.sum | 4 +- .../prometheus/tsdb/index/postings.go | 124 ++++++++--- .../tsdb/postings_for_matchers_cache.go | 210 ++++++++++++++++-- vendor/modules.txt | 4 +- 5 files changed, 288 insertions(+), 56 deletions(-) diff --git a/go.mod b/go.mod index 63b82fb2861..605ba990707 100644 --- a/go.mod +++ b/go.mod @@ -257,7 +257,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240515171138-a3ac8a4266ca +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240612071310-1055b2587904 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 70221c2109a..6c39654d179 100644 --- a/go.sum +++ b/go.sum @@ -517,8 +517,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240515171138-a3ac8a4266ca h1:4JoJnC0wQ83X1uFzpvjFxdDOtT6Y15vKFXLNK1U4YHQ= -github.com/grafana/mimir-prometheus v0.0.0-20240515171138-a3ac8a4266ca/go.mod h1:ZlD3SoAHSwXK5VGLHv78Jh5kOpgSLaQAzt9gxq76fLM= +github.com/grafana/mimir-prometheus v0.0.0-20240612071310-1055b2587904 h1:tYh4rHfofUWEisXtYcfKyDnfT25eaAIkXwYqJ27L2EQ= +github.com/grafana/mimir-prometheus v0.0.0-20240612071310-1055b2587904/go.mod h1:ZlD3SoAHSwXK5VGLHv78Jh5kOpgSLaQAzt9gxq76fLM= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go index 6cdfa56b13a..0e33ebbbf28 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go @@ -289,41 +289,61 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { // Delete removes all ids in the given map from the postings lists. func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { - var keys, vals []string + // Take the optimistic read lock for the entire method, + // and only lock for writing when we actually find something to delete. + p.mtx.RLock() + defer p.mtx.RUnlock() // Collect all keys relevant for deletion once. New keys added afterwards // can by definition not be affected by any of the given deletes. - p.mtx.RLock() + keys := make([]string, 0, len(p.m)) + maxVals := 0 for n := range p.m { keys = append(keys, n) + if len(p.m[n]) > maxVals { + maxVals = len(p.m[n]) + } } - p.mtx.RUnlock() + vals := make([]string, 0, maxVals) for _, n := range keys { - p.mtx.RLock() + // Copy the values and iterate the copy: if we unlock in the loop below, + // another goroutine might modify the map while we are part-way through it. vals = vals[:0] for v := range p.m[n] { vals = append(vals, v) } - p.mtx.RUnlock() // For each posting we first analyse whether the postings list is affected by the deletes. - // If yes, we actually reallocate a new postings list. - for _, l := range vals { - // Only lock for processing one postings list so we don't block reads for too long. - p.mtx.Lock() - + // If no, we remove the label value from the vals list. + // This way we only need to Lock once later. + for i := 0; i < len(vals); { found := false - for _, id := range p.m[n][l] { + for _, id := range p.m[n][vals[i]] { if _, ok := deleted[id]; ok { + i++ found = true break } } + if !found { - p.mtx.Unlock() - continue + // This label value doesn't contain deleted ids, so no need to process it later. + // We we continue with the next one, which is the last one in the list. + vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } + } + + // If no label values have deleted ids, just continue. + if len(vals) == 0 { + continue + } + + // The only vals left here are the ones that contain deleted ids. + // Now we take the write lock and remove the ids. + p.mtx.RUnlock() + p.mtx.Lock() + for _, l := range vals { repl := make([]storage.SeriesRef, 0, len(p.m[n][l])) for _, id := range p.m[n][l] { @@ -336,13 +356,14 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { } else { delete(p.m[n], l) } - p.mtx.Unlock() } - p.mtx.Lock() + + // Delete the key if we removed all values. if len(p.m[n]) == 0 { delete(p.m, n) } p.mtx.Unlock() + p.mtx.RLock() } } @@ -398,16 +419,65 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + // We'll copy the values into a slice and then match over that, + // this way we don't need to hold the mutex while we're matching, + // which can be slow (seconds) if the match function is a huge regex. + // Holding this lock prevents new series from being added (slows down the write path) + // and blocks the compaction process. + // + // Also, benchmarking shows that first copying the values into a slice and then matching over that is + // faster than matching over the map keys directly, at least on AMD64. + vals := p.labelValues(name) + for i, count := 0, 1; i < len(vals); count++ { + if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + return ErrPostings(ctx.Err()) + } + + if match(vals[i]) { + i++ + continue + } + + // Didn't match, bring the last value to this position, make the slice shorter and check again. + // The order of the slice doesn't matter as it comes from a map iteration. + vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] + } + + // If none matched (or this label had no values), no need to grab the lock again. + if len(vals) == 0 { + return EmptyPostings() + } + + // Now `vals` only contains the values that matched, get their postings. + its := make([]Postings, 0, len(vals)) p.mtx.RLock() + e := p.m[name] + for _, v := range vals { + if refs, ok := e[v]; ok { + // Some of the values may have been garbage-collected in the meantime this is fine, we'll just skip them. + // If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere + // because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels, + // because the series were deleted already. + its = append(its, NewListPostings(refs)) + } + } + // Let the mutex go before merging. + p.mtx.RUnlock() + + return Merge(ctx, its...) +} + +// labelValues returns a slice of label values for the given label name. +// It will take the read lock. +func (p *MemPostings) labelValues(name string) []string { + p.mtx.RLock() + defer p.mtx.RUnlock() e := p.m[name] if len(e) == 0 { - p.mtx.RUnlock() - return EmptyPostings() + return nil } - // Benchmarking shows that first copying the values into a slice and then matching over that is - // faster than matching over the map keys directly, at least on AMD64. vals := make([]string, 0, len(e)) for v, srs := range e { if len(srs) > 0 { @@ -415,21 +485,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, } } - var its []Postings - count := 1 - for _, v := range vals { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { - p.mtx.RUnlock() - return ErrPostings(ctx.Err()) - } - count++ - if match(v) { - its = append(its, NewListPostings(e[v])) - } - } - p.mtx.RUnlock() - - return Merge(ctx, its...) + return vals } // ExpandPostings returns the postings expanded as a slice. diff --git a/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go b/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go index 0d7b2c2a5e4..9d359fe7264 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go @@ -3,6 +3,7 @@ package tsdb import ( "container/list" "context" + "errors" "fmt" "strings" "sync" @@ -82,9 +83,14 @@ type PostingsForMatchersCache struct { // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time + // postingsForMatchers can be replaced for testing purposes postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) + // onPromiseExecutionDoneBeforeHook is used for testing purposes. It allows to hook at the + // beginning of onPromiseExecutionDone() execution. + onPromiseExecutionDoneBeforeHook func() + tracer trace.Tracer // Preallocated for performance ttlAttrib attribute.KeyValue @@ -121,8 +127,13 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I } type postingsForMatcherPromise struct { - done chan struct{} + // Keep track of all callers contexts in order to cancel the execution context if all + // callers contexts get canceled. + callersCtxTracker *contextsTracker + // The result of the promise is stored either in cloner or err (only of the two is valued). + // Do not access these fields until the done channel is closed. + done chan struct{} cloner *index.PostingsCloner err error } @@ -147,19 +158,54 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { span := trace.SpanFromContext(ctx) + promiseCallersCtxTracker, promiseExecCtx := newContextsTracker() promise := &postingsForMatcherPromise{ - done: make(chan struct{}), + done: make(chan struct{}), + callersCtxTracker: promiseCallersCtxTracker, } + // Add the caller context to the ones tracked by the new promise. + // + // It's important to do it here so that if the promise will be stored, then the caller's context is already + // tracked. Otherwise, if the promise will not be stored (because there's another in-flight promise for the + // same label matchers) then it's not a problem, and resources will be released. + // + // Skipping the error checking because it can't happen here. + _ = promise.callersCtxTracker.add(ctx) + key := matchersKey(ms) - oldPromise, loaded := c.calls.LoadOrStore(key, promise) - if loaded { - // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine + + if oldPromiseValue, loaded := c.calls.LoadOrStore(key, promise); loaded { + // The new promise hasn't been stored because there's already an in-flight promise + // for the same label matchers. We should just wait for it. + + // Release the resources created by the new promise, that will not be used. + close(promise.done) + promise.callersCtxTracker.close() + + oldPromise := oldPromiseValue.(*postingsForMatcherPromise) + + // Add the caller context to the ones tracked by the old promise (currently in-flight). + if err := oldPromise.callersCtxTracker.add(ctx); err != nil && errors.Is(err, errContextsTrackerCanceled{}) { + // We've hit a race condition happening when the "loaded" promise execution was just canceled, + // but it hasn't been removed from map of calls yet, so the old promise was loaded anyway. + // + // We expect this race condition to be infrequent. In this case we simply skip the cache and + // pass through the execution to the underlying postingsForMatchers(). + span.AddEvent("looked up in-flight postingsForMatchers promise, but the promise was just canceled due to a race condition: skipping the cache", trace.WithAttributes( + attribute.String("cache_key", key), + )) + + return func(ctx context.Context) (index.Postings, error) { + return c.postingsForMatchers(ctx, ix, ms...) + } + } + span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( attribute.String("cache_key", key), )) - close(promise.done) - return oldPromise.(*postingsForMatcherPromise).result + + return oldPromise.result } span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes(attribute.String("cache_key", key))) @@ -167,19 +213,25 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex // promise was stored, close its channel after fulfilment defer close(promise.done) - // Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with - // its own context. Also, keep the call independent of this particular context, since the promise will be reused. - // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have - // cancelled their context? - if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { + // The execution context will be canceled only once all callers contexts will be canceled. This way we: + // 1. Do not cancel postingsForMatchers() the input ctx is cancelled, but another goroutine is waiting + // for the promise result. + // 2. Cancel postingsForMatchers() once all callers contexts have been canceled, so that we don't waist + // resources computing postingsForMatchers() is all requests have been canceled (this is particularly + // important if the postingsForMatchers() is very slow due to expensive regexp matchers). + if postings, err := c.postingsForMatchers(promiseExecCtx, ix, ms...); err != nil { promise.err = err } else { promise.cloner = index.NewPostingsCloner(postings) } + // The execution terminated (or has been canceled). We have to close the tracker to release resources. + // It's important to close it before computing the promise size, so that the actual size is smaller. + promise.callersCtxTracker.close() + sizeBytes := int64(len(key) + size.Of(promise)) - c.created(ctx, key, c.timeNow(), sizeBytes) + c.onPromiseExecutionDone(ctx, key, c.timeNow(), sizeBytes, promise.err) return promise.result } @@ -236,13 +288,28 @@ func (c *PostingsForMatchersCache) evictHead() { c.cachedBytes -= oldest.sizeBytes } -// created has to be called when returning from the PostingsForMatchers call that creates the promise. -// the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { +// onPromiseExecutionDone must be called once the execution of PostingsForMatchers promise has done. +// The input err contains details about any error that could have occurred when executing it. +// The input ts is the function call time. +func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, ts time.Time, sizeBytes int64, err error) { span := trace.SpanFromContext(ctx) + // Call the registered hook, if any. It's used only for testing purposes. + if c.onPromiseExecutionDoneBeforeHook != nil { + c.onPromiseExecutionDoneBeforeHook() + } + + // Do not cache if cache is disabled. if c.ttl <= 0 { - span.AddEvent("deleting cached promise since c.ttl <= 0") + span.AddEvent("not caching promise result because configured TTL is <= 0") + c.calls.Delete(key) + return + } + + // Do not cache if the promise execution was canceled (it gets cancelled once all the callers contexts have + // been canceled). + if errors.Is(err, context.Canceled) { + span.AddEvent("not caching promise result because execution has been canceled") c.calls.Delete(key) return } @@ -298,3 +365,112 @@ func (ir indexReaderWithPostingsForMatchers) PostingsForMatchers(ctx context.Con } var _ IndexReader = indexReaderWithPostingsForMatchers{} + +// errContextsTrackerClosed is the reason to identify contextsTracker has been explicitly closed by calling close(). +// +// This error is a struct instead of a globally generic error so that postingsForMatcherPromise computed size is smaller +// (this error is referenced by contextsTracker, which is referenced by postingsForMatcherPromise). +type errContextsTrackerClosed struct{} + +func (e errContextsTrackerClosed) Error() string { + return "contexts tracker is closed" +} + +// errContextsTrackerCanceled is the reason to identify contextsTracker has been automatically closed because +// all tracked contexts have been canceled. +// +// This error is a struct instead of a globally generic error so that postingsForMatcherPromise computed size is smaller +// (this error is referenced by contextsTracker, which is referenced by postingsForMatcherPromise). +type errContextsTrackerCanceled struct{} + +func (e errContextsTrackerCanceled) Error() string { + return "contexts tracker has been canceled" +} + +// contextsTracker is responsible to monitor multiple context.Context and provides an execution +// that gets canceled once all monitored context.Context have done. +type contextsTracker struct { + cancelExecCtx context.CancelFunc + + mx sync.Mutex + closedWithReason error // Track whether the tracker is closed and why. The tracker is not closed if this is nil. + trackedCount int // Number of tracked contexts. + trackedStopFuncs []func() bool // The stop watching functions for all tracked contexts. +} + +func newContextsTracker() (*contextsTracker, context.Context) { + t := &contextsTracker{} + + // Create a new execution context that will be canceled only once all tracked contexts have done. + var execCtx context.Context + execCtx, t.cancelExecCtx = context.WithCancel(context.Background()) + + return t, execCtx +} + +// add the input ctx to the group of monitored context.Context. +// Returns false if the input context couldn't be added to the tracker because the tracker is already closed. +func (t *contextsTracker) add(ctx context.Context) error { + t.mx.Lock() + defer t.mx.Unlock() + + // Check if we've already done. + if t.closedWithReason != nil { + return t.closedWithReason + } + + // Register a function that will be called once the tracked context has done. + t.trackedCount++ + t.trackedStopFuncs = append(t.trackedStopFuncs, context.AfterFunc(ctx, t.onTrackedContextDone)) + + return nil +} + +// close the tracker. When the tracker is closed, the execution context is canceled +// and resources releases. +// +// This function must be called once done to not leak resources. +func (t *contextsTracker) close() { + t.mx.Lock() + defer t.mx.Unlock() + + t.unsafeClose(errContextsTrackerClosed{}) +} + +// unsafeClose must be called with the t.mx lock hold. +func (t *contextsTracker) unsafeClose(reason error) { + if t.closedWithReason != nil { + return + } + + t.cancelExecCtx() + + // Stop watching the tracked contexts. It's safe to call the stop function on a context + // for which was already done. + for _, fn := range t.trackedStopFuncs { + fn() + } + + t.trackedCount = 0 + t.trackedStopFuncs = nil + t.closedWithReason = reason +} + +func (t *contextsTracker) onTrackedContextDone() { + t.mx.Lock() + defer t.mx.Unlock() + + t.trackedCount-- + + // If this was the last context to be tracked, we can close the tracker and cancel the execution context. + if t.trackedCount == 0 { + t.unsafeClose(errContextsTrackerCanceled{}) + } +} + +func (t *contextsTracker) trackedContextsCount() int { + t.mx.Lock() + defer t.mx.Unlock() + + return t.trackedCount +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c73d8e72e55..2fa7a6e938b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -935,7 +935,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240515171138-a3ac8a4266ca +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240612071310-1055b2587904 ## explicit; go 1.21 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1550,7 +1550,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240515171138-a3ac8a4266ca +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240612071310-1055b2587904 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6