diff --git a/go.mod b/go.mod index 14ce31da489..1beb86953ac 100644 --- a/go.mod +++ b/go.mod @@ -257,7 +257,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240612071315-901d3d434fae // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 59eeb67d9ae..4482ead06be 100644 --- a/go.sum +++ b/go.sum @@ -517,8 +517,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 h1:PCmcaGd1Sb8VLosUGWDFKV5gE1A/yoBiPQuRmTSAjZY= -github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932/go.mod h1:ZlD3SoAHSwXK5VGLHv78Jh5kOpgSLaQAzt9gxq76fLM= +github.com/grafana/mimir-prometheus v0.0.0-20240612071315-901d3d434fae h1:R9VKkJFssVvQ3Lh+5xXeG3rfgaWkPGnwwb5jseZnxyU= +github.com/grafana/mimir-prometheus v0.0.0-20240612071315-901d3d434fae/go.mod h1:ZlD3SoAHSwXK5VGLHv78Jh5kOpgSLaQAzt9gxq76fLM= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go b/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go index 0d7b2c2a5e4..9d359fe7264 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/postings_for_matchers_cache.go @@ -3,6 +3,7 @@ package tsdb import ( "container/list" "context" + "errors" "fmt" "strings" "sync" @@ -82,9 +83,14 @@ type PostingsForMatchersCache struct { // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time + // postingsForMatchers can be replaced for testing purposes postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) + // onPromiseExecutionDoneBeforeHook is used for testing purposes. It allows to hook at the + // beginning of onPromiseExecutionDone() execution. + onPromiseExecutionDoneBeforeHook func() + tracer trace.Tracer // Preallocated for performance ttlAttrib attribute.KeyValue @@ -121,8 +127,13 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I } type postingsForMatcherPromise struct { - done chan struct{} + // Keep track of all callers contexts in order to cancel the execution context if all + // callers contexts get canceled. + callersCtxTracker *contextsTracker + // The result of the promise is stored either in cloner or err (only of the two is valued). + // Do not access these fields until the done channel is closed. + done chan struct{} cloner *index.PostingsCloner err error } @@ -147,19 +158,54 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { span := trace.SpanFromContext(ctx) + promiseCallersCtxTracker, promiseExecCtx := newContextsTracker() promise := &postingsForMatcherPromise{ - done: make(chan struct{}), + done: make(chan struct{}), + callersCtxTracker: promiseCallersCtxTracker, } + // Add the caller context to the ones tracked by the new promise. + // + // It's important to do it here so that if the promise will be stored, then the caller's context is already + // tracked. Otherwise, if the promise will not be stored (because there's another in-flight promise for the + // same label matchers) then it's not a problem, and resources will be released. + // + // Skipping the error checking because it can't happen here. + _ = promise.callersCtxTracker.add(ctx) + key := matchersKey(ms) - oldPromise, loaded := c.calls.LoadOrStore(key, promise) - if loaded { - // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine + + if oldPromiseValue, loaded := c.calls.LoadOrStore(key, promise); loaded { + // The new promise hasn't been stored because there's already an in-flight promise + // for the same label matchers. We should just wait for it. + + // Release the resources created by the new promise, that will not be used. + close(promise.done) + promise.callersCtxTracker.close() + + oldPromise := oldPromiseValue.(*postingsForMatcherPromise) + + // Add the caller context to the ones tracked by the old promise (currently in-flight). + if err := oldPromise.callersCtxTracker.add(ctx); err != nil && errors.Is(err, errContextsTrackerCanceled{}) { + // We've hit a race condition happening when the "loaded" promise execution was just canceled, + // but it hasn't been removed from map of calls yet, so the old promise was loaded anyway. + // + // We expect this race condition to be infrequent. In this case we simply skip the cache and + // pass through the execution to the underlying postingsForMatchers(). + span.AddEvent("looked up in-flight postingsForMatchers promise, but the promise was just canceled due to a race condition: skipping the cache", trace.WithAttributes( + attribute.String("cache_key", key), + )) + + return func(ctx context.Context) (index.Postings, error) { + return c.postingsForMatchers(ctx, ix, ms...) + } + } + span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( attribute.String("cache_key", key), )) - close(promise.done) - return oldPromise.(*postingsForMatcherPromise).result + + return oldPromise.result } span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes(attribute.String("cache_key", key))) @@ -167,19 +213,25 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex // promise was stored, close its channel after fulfilment defer close(promise.done) - // Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with - // its own context. Also, keep the call independent of this particular context, since the promise will be reused. - // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have - // cancelled their context? - if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { + // The execution context will be canceled only once all callers contexts will be canceled. This way we: + // 1. Do not cancel postingsForMatchers() the input ctx is cancelled, but another goroutine is waiting + // for the promise result. + // 2. Cancel postingsForMatchers() once all callers contexts have been canceled, so that we don't waist + // resources computing postingsForMatchers() is all requests have been canceled (this is particularly + // important if the postingsForMatchers() is very slow due to expensive regexp matchers). + if postings, err := c.postingsForMatchers(promiseExecCtx, ix, ms...); err != nil { promise.err = err } else { promise.cloner = index.NewPostingsCloner(postings) } + // The execution terminated (or has been canceled). We have to close the tracker to release resources. + // It's important to close it before computing the promise size, so that the actual size is smaller. + promise.callersCtxTracker.close() + sizeBytes := int64(len(key) + size.Of(promise)) - c.created(ctx, key, c.timeNow(), sizeBytes) + c.onPromiseExecutionDone(ctx, key, c.timeNow(), sizeBytes, promise.err) return promise.result } @@ -236,13 +288,28 @@ func (c *PostingsForMatchersCache) evictHead() { c.cachedBytes -= oldest.sizeBytes } -// created has to be called when returning from the PostingsForMatchers call that creates the promise. -// the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { +// onPromiseExecutionDone must be called once the execution of PostingsForMatchers promise has done. +// The input err contains details about any error that could have occurred when executing it. +// The input ts is the function call time. +func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, ts time.Time, sizeBytes int64, err error) { span := trace.SpanFromContext(ctx) + // Call the registered hook, if any. It's used only for testing purposes. + if c.onPromiseExecutionDoneBeforeHook != nil { + c.onPromiseExecutionDoneBeforeHook() + } + + // Do not cache if cache is disabled. if c.ttl <= 0 { - span.AddEvent("deleting cached promise since c.ttl <= 0") + span.AddEvent("not caching promise result because configured TTL is <= 0") + c.calls.Delete(key) + return + } + + // Do not cache if the promise execution was canceled (it gets cancelled once all the callers contexts have + // been canceled). + if errors.Is(err, context.Canceled) { + span.AddEvent("not caching promise result because execution has been canceled") c.calls.Delete(key) return } @@ -298,3 +365,112 @@ func (ir indexReaderWithPostingsForMatchers) PostingsForMatchers(ctx context.Con } var _ IndexReader = indexReaderWithPostingsForMatchers{} + +// errContextsTrackerClosed is the reason to identify contextsTracker has been explicitly closed by calling close(). +// +// This error is a struct instead of a globally generic error so that postingsForMatcherPromise computed size is smaller +// (this error is referenced by contextsTracker, which is referenced by postingsForMatcherPromise). +type errContextsTrackerClosed struct{} + +func (e errContextsTrackerClosed) Error() string { + return "contexts tracker is closed" +} + +// errContextsTrackerCanceled is the reason to identify contextsTracker has been automatically closed because +// all tracked contexts have been canceled. +// +// This error is a struct instead of a globally generic error so that postingsForMatcherPromise computed size is smaller +// (this error is referenced by contextsTracker, which is referenced by postingsForMatcherPromise). +type errContextsTrackerCanceled struct{} + +func (e errContextsTrackerCanceled) Error() string { + return "contexts tracker has been canceled" +} + +// contextsTracker is responsible to monitor multiple context.Context and provides an execution +// that gets canceled once all monitored context.Context have done. +type contextsTracker struct { + cancelExecCtx context.CancelFunc + + mx sync.Mutex + closedWithReason error // Track whether the tracker is closed and why. The tracker is not closed if this is nil. + trackedCount int // Number of tracked contexts. + trackedStopFuncs []func() bool // The stop watching functions for all tracked contexts. +} + +func newContextsTracker() (*contextsTracker, context.Context) { + t := &contextsTracker{} + + // Create a new execution context that will be canceled only once all tracked contexts have done. + var execCtx context.Context + execCtx, t.cancelExecCtx = context.WithCancel(context.Background()) + + return t, execCtx +} + +// add the input ctx to the group of monitored context.Context. +// Returns false if the input context couldn't be added to the tracker because the tracker is already closed. +func (t *contextsTracker) add(ctx context.Context) error { + t.mx.Lock() + defer t.mx.Unlock() + + // Check if we've already done. + if t.closedWithReason != nil { + return t.closedWithReason + } + + // Register a function that will be called once the tracked context has done. + t.trackedCount++ + t.trackedStopFuncs = append(t.trackedStopFuncs, context.AfterFunc(ctx, t.onTrackedContextDone)) + + return nil +} + +// close the tracker. When the tracker is closed, the execution context is canceled +// and resources releases. +// +// This function must be called once done to not leak resources. +func (t *contextsTracker) close() { + t.mx.Lock() + defer t.mx.Unlock() + + t.unsafeClose(errContextsTrackerClosed{}) +} + +// unsafeClose must be called with the t.mx lock hold. +func (t *contextsTracker) unsafeClose(reason error) { + if t.closedWithReason != nil { + return + } + + t.cancelExecCtx() + + // Stop watching the tracked contexts. It's safe to call the stop function on a context + // for which was already done. + for _, fn := range t.trackedStopFuncs { + fn() + } + + t.trackedCount = 0 + t.trackedStopFuncs = nil + t.closedWithReason = reason +} + +func (t *contextsTracker) onTrackedContextDone() { + t.mx.Lock() + defer t.mx.Unlock() + + t.trackedCount-- + + // If this was the last context to be tracked, we can close the tracker and cancel the execution context. + if t.trackedCount == 0 { + t.unsafeClose(errContextsTrackerCanceled{}) + } +} + +func (t *contextsTracker) trackedContextsCount() int { + t.mx.Lock() + defer t.mx.Unlock() + + return t.trackedCount +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 45ec8e3ddfe..045ed30eebd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -935,7 +935,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240612071315-901d3d434fae ## explicit; go 1.21 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1550,7 +1550,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240612071315-901d3d434fae # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6