From 817ce2541b79977140360fd9d0e369a06a9fb3de Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jun 2024 09:42:29 +0200 Subject: [PATCH] [r293] Update to mimir-prometheus w/ MemPostings locking improvements Signed-off-by: Oleg Zaytsev --- go.mod | 2 +- go.sum | 4 +- .../prometheus/tsdb/index/postings.go | 127 +++++++++++++----- vendor/modules.txt | 4 +- 4 files changed, 98 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 213d549dffc..829d7ecb3b0 100644 --- a/go.mod +++ b/go.mod @@ -257,7 +257,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240515135245-e5b85c151ba8 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 0febb58ad78..64a18652936 100644 --- a/go.sum +++ b/go.sum @@ -517,8 +517,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240515135245-e5b85c151ba8 h1:XmqfG3buFH0G/ns/DXnyMx46LITUEENXjZ84f7YAUy0= -github.com/grafana/mimir-prometheus v0.0.0-20240515135245-e5b85c151ba8/go.mod h1:ZlD3SoAHSwXK5VGLHv78Jh5kOpgSLaQAzt9gxq76fLM= +github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 h1:PCmcaGd1Sb8VLosUGWDFKV5gE1A/yoBiPQuRmTSAjZY= +github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932/go.mod h1:ZlD3SoAHSwXK5VGLHv78Jh5kOpgSLaQAzt9gxq76fLM= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240603081606-9f1bbb0feb0c h1:Zgf0KrZy6of/6MQTRvzCVvKni/wWxEMWnipeqbo+0ek= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go index 6cdfa56b13a..e1032ff12c2 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go @@ -289,41 +289,67 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { // Delete removes all ids in the given map from the postings lists. func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { - var keys, vals []string + // We will take an optimistic read lock for the entire method, + // and only lock for writing when we actually find something to delete. + // + // Each SeriesRef can appear in several Postings. + // To change each one, we need to know the label name and value that it is indexed under. + // We iterate over all label names, then for each name all values, + // and look for individual series to be deleted. + p.mtx.RLock() + defer p.mtx.RUnlock() // Collect all keys relevant for deletion once. New keys added afterwards // can by definition not be affected by any of the given deletes. - p.mtx.RLock() + keys := make([]string, 0, len(p.m)) + maxVals := 0 for n := range p.m { keys = append(keys, n) + if len(p.m[n]) > maxVals { + maxVals = len(p.m[n]) + } } - p.mtx.RUnlock() + vals := make([]string, 0, maxVals) for _, n := range keys { - p.mtx.RLock() + // Copy the values and iterate the copy: if we unlock in the loop below, + // another goroutine might modify the map while we are part-way through it. vals = vals[:0] for v := range p.m[n] { vals = append(vals, v) } - p.mtx.RUnlock() // For each posting we first analyse whether the postings list is affected by the deletes. - // If yes, we actually reallocate a new postings list. - for _, l := range vals { - // Only lock for processing one postings list so we don't block reads for too long. - p.mtx.Lock() - + // If no, we remove the label value from the vals list. + // This way we only need to Lock once later. + for i := 0; i < len(vals); { found := false - for _, id := range p.m[n][l] { + refs := p.m[n][vals[i]] + for _, id := range refs { if _, ok := deleted[id]; ok { + i++ found = true break } } + if !found { - p.mtx.Unlock() - continue + // Didn't match, bring the last value to this position, make the slice shorter and check again. + // The order of the slice doesn't matter as it comes from a map iteration. + vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } + } + + // If no label values have deleted ids, just continue. + if len(vals) == 0 { + continue + } + + // The only vals left here are the ones that contain deleted ids. + // Now we take the write lock and remove the ids. + p.mtx.RUnlock() + p.mtx.Lock() + for _, l := range vals { repl := make([]storage.SeriesRef, 0, len(p.m[n][l])) for _, id := range p.m[n][l] { @@ -336,13 +362,14 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { } else { delete(p.m[n], l) } - p.mtx.Unlock() } - p.mtx.Lock() + + // Delete the key if we removed all values. if len(p.m[n]) == 0 { delete(p.m, n) } p.mtx.Unlock() + p.mtx.RLock() } } @@ -398,16 +425,62 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + // We'll copy the values into a slice and then match over that, + // this way we don't need to hold the mutex while we're matching, + // which can be slow (seconds) if the match function is a huge regex. + // Holding this lock prevents new series from being added (slows down the write path) + // and blocks the compaction process. + vals := p.labelValues(name) + for i, count := 0, 1; i < len(vals); count++ { + if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + return ErrPostings(ctx.Err()) + } + + if match(vals[i]) { + i++ + continue + } + + // Didn't match, bring the last value to this position, make the slice shorter and check again. + // The order of the slice doesn't matter as it comes from a map iteration. + vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] + } + + // If none matched (or this label had no values), no need to grab the lock again. + if len(vals) == 0 { + return EmptyPostings() + } + + // Now `vals` only contains the values that matched, get their postings. + its := make([]Postings, 0, len(vals)) + p.mtx.RLock() + e := p.m[name] + for _, v := range vals { + if refs, ok := e[v]; ok { + // Some of the values may have been garbage-collected in the meantime this is fine, we'll just skip them. + // If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere + // because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels, + // because the series were deleted already. + its = append(its, NewListPostings(refs)) + } + } + // Let the mutex go before merging. + p.mtx.RUnlock() + + return Merge(ctx, its...) +} + +// labelValues returns a slice of label values for the given label name. +// It will take the read lock. +func (p *MemPostings) labelValues(name string) []string { p.mtx.RLock() + defer p.mtx.RUnlock() e := p.m[name] if len(e) == 0 { - p.mtx.RUnlock() - return EmptyPostings() + return nil } - // Benchmarking shows that first copying the values into a slice and then matching over that is - // faster than matching over the map keys directly, at least on AMD64. vals := make([]string, 0, len(e)) for v, srs := range e { if len(srs) > 0 { @@ -415,21 +488,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, } } - var its []Postings - count := 1 - for _, v := range vals { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { - p.mtx.RUnlock() - return ErrPostings(ctx.Err()) - } - count++ - if match(v) { - its = append(its, NewListPostings(e[v])) - } - } - p.mtx.RUnlock() - - return Merge(ctx, its...) + return vals } // ExpandPostings returns the postings expanded as a slice. diff --git a/vendor/modules.txt b/vendor/modules.txt index 830df11be00..d92a57751eb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -935,7 +935,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240515135245-e5b85c151ba8 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 ## explicit; go 1.21 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1550,7 +1550,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240515135245-e5b85c151ba8 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240611072449-0b185dc4a932 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b