diff --git a/CHANGELOG.md b/CHANGELOG.md index 80bf0a02413..0a477abc1d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ * [FEATURE] Querier / query-frontend: added `-querier.promql-experimental-functions-enabled` CLI flag (and respective YAML config option) to enable experimental PromQL functions. The experimental functions introduced are: `mad_over_time()`, `sort_by_label()` and `sort_by_label_desc()`. #7057 * [FEATURE] Alertmanager API: added `-alertmanager.grafana-alertmanager-compatibility-enabled` CLI flag (and respective YAML config option) to enable an experimental API endpoints that support the migration of the Grafana Alertmanager. #7057 * [FEATURE] Alertmanager: Added `-alertmanager.utf8-strict-mode-enabled` to control support for any UTF-8 character as part of Alertmanager configuration/API matchers and labels. It's default value is set to `false`. #6898 +* [FEATURE] Querier: added `histogram_avg()` function support to PromQL. #7293 * [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848 * [ENHANCEMENT] PromQL: ignore small errors for bucketQuantile #6766 * [ENHANCEMENT] Distributor: improve efficiency of some errors #6785 diff --git a/go.mod b/go.mod index 26420c4fe91..3f52eea28a7 100644 --- a/go.mod +++ b/go.mod @@ -256,7 +256,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index ef52a45e1c0..37929e19862 100644 --- a/go.sum +++ b/go.sum @@ -552,8 +552,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4 h1:cU5K0ITFhJiSxDaDn8qvjCmd0xsDPDtizz57ZU5kaeM= -github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ= +github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c h1:jrkiLy8SjrLKbCeF1SCq6bKfdM2bR5/1hlnx+wwxAPQ= +github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c/go.mod h1:IMaHPfxuCuOjlEoyUE0AG7FGZLEwCg6WHXHLNQwPrJQ= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= diff --git a/pkg/compactor/bucket_compactor_e2e_test.go b/pkg/compactor/bucket_compactor_e2e_test.go index 6fe8a602606..9198dee9b00 100644 --- a/pkg/compactor/bucket_compactor_e2e_test.go +++ b/pkg/compactor/bucket_compactor_e2e_test.go @@ -28,16 +28,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" - "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/storage/tsdb/block" @@ -333,26 +329,16 @@ func TestGroupCompactE2E(t *testing.T) { labels.FromStrings("a", "7"), }, }, - }, []blockgenSpec{ - { - numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124, - series: []labels.Labels{ - labels.FromStrings("a", "1"), - labels.FromStrings("a", "1", "b", "2"), - labels.FromStrings("a", "3"), - labels.FromStrings("a", "4"), - }, - }, }) require.NoError(t, bComp.Compact(ctx, 0), 0) assert.Equal(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) - assert.Equal(t, 1.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason))) + assert.Equal(t, 0.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason))) assert.Equal(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactions)) - assert.Equal(t, 3.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted)) + assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted)) assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted)) - assert.Equal(t, 1.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed)) + assert.Equal(t, 0.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed)) _, err = os.Stat(dir) assert.True(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir) @@ -364,7 +350,6 @@ func TestGroupCompactE2E(t *testing.T) { metas[4].ULID: false, metas[5].ULID: false, metas[8].ULID: false, - metas[9].ULID: false, } others := map[string]block.Meta{} require.NoError(t, bkt.Iter(ctx, "", func(n string) error { @@ -436,7 +421,7 @@ type blockgenSpec struct { res int64 } -func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*block.Meta) { +func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*block.Meta) { prepareDir := t.TempDir() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) @@ -447,15 +432,6 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, b metas = append(metas, meta) require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), nil)) } - for _, b := range blocksWithOutOfOrderChunks { - id, meta := createBlock(ctx, t, prepareDir, b) - - err := putOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt) - require.NoError(t, err) - - metas = append(metas, meta) - require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), nil)) - } return metas } @@ -747,103 +723,3 @@ func createBlockWithOptions( return id, nil } - -var indexFilename = "index" - -type indexWriterSeries struct { - labels labels.Labels - chunks []chunks.Meta // series file offset of chunks -} - -type indexWriterSeriesSlice []*indexWriterSeries - -// putOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk -// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346 -func putOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { - - if minTime >= maxTime || minTime+4 >= maxTime { - return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks") - } - - lbls := []labels.Labels{ - labels.FromStrings("lbl1", "1"), - } - - // Sort labels as the index writer expects series in sorted order. - sort.Sort(labels.Slice(lbls)) - - symbols := map[string]struct{}{} - for _, lset := range lbls { - lset.Range(func(l labels.Label) { - symbols[l.Name] = struct{}{} - symbols[l.Value] = struct{}{} - }) - } - - var input indexWriterSeriesSlice - - // Generate ChunkMetas for every label set. - for _, lset := range lbls { - var metas []chunks.Meta - // only need two chunks that are out-of-order - chk1 := chunks.Meta{ - MinTime: maxTime - 2, - MaxTime: maxTime - 1, - Ref: chunks.ChunkRef(rand.Uint64()), - Chunk: chunkenc.NewXORChunk(), - } - metas = append(metas, chk1) - chk2 := chunks.Meta{ - MinTime: minTime + 1, - MaxTime: minTime + 2, - Ref: chunks.ChunkRef(rand.Uint64()), - Chunk: chunkenc.NewXORChunk(), - } - metas = append(metas, chk2) - - input = append(input, &indexWriterSeries{ - labels: lset, - chunks: metas, - }) - } - - iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename)) - if err != nil { - return err - } - - syms := []string{} - for s := range symbols { - syms = append(syms, s) - } - slices.Sort(syms) - for _, s := range syms { - if err := iw.AddSymbol(s); err != nil { - return err - } - } - - // Population procedure as done by compaction. - var ( - postings = index.NewMemPostings() - values = map[string]map[string]struct{}{} - ) - - for i, s := range input { - if err := iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...); err != nil { - return err - } - - s.labels.Range(func(l labels.Label) { - valset, ok := values[l.Name] - if !ok { - valset = map[string]struct{}{} - values[l.Name] = valset - } - valset[l.Value] = struct{}{} - }) - postings.Add(storage.SeriesRef(i), s.labels) - } - - return iw.Close() -} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 89948e6d3f6..b0b7949ed08 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -2181,38 +2181,75 @@ func stopServiceFn(t *testing.T, serv services.Service) func() { } func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { - // Generate a single block with out of order chunks. - specs := []*block.SeriesSpec{ - { - Labels: labels.FromStrings("case", "out_of_order"), - Chunks: []chunks.Meta{ - must(chunks.ChunkFromSamples([]chunks.Sample{newSample(20, 20, nil, nil), newSample(21, 21, nil, nil)})), - must(chunks.ChunkFromSamples([]chunks.Sample{newSample(10, 10, nil, nil), newSample(11, 11, nil, nil)})), - // Extend block to cover 2h. - must(chunks.ChunkFromSamples([]chunks.Sample{newSample(0, 0, nil, nil), newSample(2*time.Hour.Milliseconds()-1, 0, nil, nil)})), + const user = "user" + + var ( + ctx = context.Background() + storageDir = t.TempDir() + fixtureDir = filepath.Join("fixtures", "test-ooo-compaction") + ) + + // Utility function originally used to generate a block with out of order chunks + // used by this test. The block has been generated commenting out the checks done + // by TSDB block Writer to prevent OOO chunks writing. + _ = func() { + specs := []*block.SeriesSpec{ + { + Labels: labels.FromStrings("case", "out_of_order"), + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{newSample(20, 20, nil, nil), newSample(21, 21, nil, nil)})), + must(chunks.ChunkFromSamples([]chunks.Sample{newSample(10, 10, nil, nil), newSample(11, 11, nil, nil)})), + // Extend block to cover 2h. + must(chunks.ChunkFromSamples([]chunks.Sample{newSample(0, 0, nil, nil), newSample(2*time.Hour.Milliseconds()-1, 0, nil, nil)})), + }, }, - }, - } + } - const user = "user" + _, err := block.GenerateBlockFromSpec(fixtureDir, specs) + require.NoError(t, err) - storageDir := t.TempDir() - // We need two blocks to start compaction. - meta1, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, user), specs) - require.NoError(t, err) - meta2, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, user), specs) - require.NoError(t, err) + _, err = block.GenerateBlockFromSpec(fixtureDir, specs) + require.NoError(t, err) + } bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) + userBkt := bucket.NewUserBucketClient(user, bkt, nil) + + // Copy blocks from fixtures dir to the test bucket. + var metas []*block.Meta + + entries, err := os.ReadDir(fixtureDir) + require.NoError(t, err) + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + blockDir := filepath.Join(fixtureDir, entry.Name()) + + blockID, err := ulid.Parse(entry.Name()) + require.NoErrorf(t, err, "parsing block ID from directory name %q", entry.Name()) + + meta, err := block.ReadMetaFromDir(blockDir) + require.NoErrorf(t, err, "reading meta from block at &s", blockDir) + + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), userBkt, filepath.Join(fixtureDir, blockID.String()), meta)) + + metas = append(metas, meta) + } + + // We expect 2 blocks have been copied. + require.Len(t, metas, 2) cfg := prepareConfig(t) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bkt) - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*block.Meta{meta1, meta2}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return(metas, nil) // Start the compactor - require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + require.NoError(t, services.StartAndAwaitRunning(ctx, c)) // Wait until a compaction run has been completed. test.Poll(t, 10*time.Second, 1.0, func() interface{} { @@ -2220,7 +2257,7 @@ func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { }) // Stop the compactor. - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + require.NoError(t, services.StopAndAwaitTerminated(ctx, c)) // Verify that compactor has found block with out of order chunks, and this block is now marked for no-compaction. r := regexp.MustCompile("level=info component=compactor user=user msg=\"block has been marked for no compaction\" block=([0-9A-Z]+)") @@ -2228,10 +2265,10 @@ func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { require.Len(t, matches, 2) // Entire string match + single group match. skippedBlock := matches[1] - require.True(t, skippedBlock == meta1.ULID.String() || skippedBlock == meta2.ULID.String()) + require.True(t, skippedBlock == metas[0].ULID.String() || skippedBlock == metas[1].ULID.String()) m := &block.NoCompactMark{} - require.NoError(t, block.ReadMarker(context.Background(), log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m)) + require.NoError(t, block.ReadMarker(ctx, log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m)) require.Equal(t, skippedBlock, m.ID.String()) require.NotZero(t, m.NoCompactTime) require.Equal(t, block.NoCompactReason(block.OutOfOrderChunksNoCompactReason), m.Reason) diff --git a/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/chunks/000001 b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/chunks/000001 new file mode 100644 index 00000000000..dc6d6641465 Binary files /dev/null and b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/chunks/000001 differ diff --git a/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/index b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/index new file mode 100644 index 00000000000..a269189d95a Binary files /dev/null and b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/index differ diff --git a/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/meta.json b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/meta.json new file mode 100644 index 00000000000..75c30d62b06 --- /dev/null +++ b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKRP02T1EW2AXEZXG12QA/meta.json @@ -0,0 +1,22 @@ +{ + "ulid": "01HNWYKRP02T1EW2AXEZXG12QA", + "minTime": 0, + "maxTime": 7200000, + "stats": {}, + "compaction": { + "level": 1, + "sources": [ + "01HNWYKRP02T1EW2AXEZXG12QA" + ] + }, + "version": 1, + "out_of_order": false, + "thanos": { + "version": 1, + "labels": null, + "downsample": { + "resolution": 0 + }, + "source": "" + } +} diff --git a/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/chunks/000001 b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/chunks/000001 new file mode 100644 index 00000000000..dc6d6641465 Binary files /dev/null and b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/chunks/000001 differ diff --git a/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/index b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/index new file mode 100644 index 00000000000..a269189d95a Binary files /dev/null and b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/index differ diff --git a/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/meta.json b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/meta.json new file mode 100644 index 00000000000..77ae576cf37 --- /dev/null +++ b/pkg/compactor/fixtures/test-ooo-compaction/01HNWYKS6MZWPPV0Q3FNEK4GHZ/meta.json @@ -0,0 +1,22 @@ +{ + "ulid": "01HNWYKS6MZWPPV0Q3FNEK4GHZ", + "minTime": 0, + "maxTime": 7200000, + "stats": {}, + "compaction": { + "level": 1, + "sources": [ + "01HNWYKS6MZWPPV0Q3FNEK4GHZ" + ] + }, + "version": 1, + "out_of_order": false, + "thanos": { + "version": 1, + "labels": null, + "downsample": { + "resolution": 0 + }, + "source": "" + } +} diff --git a/pkg/frontend/querymiddleware/astmapper/parallel.go b/pkg/frontend/querymiddleware/astmapper/parallel.go index bed8b90b9d5..427d5d47546 100644 --- a/pkg/frontend/querymiddleware/astmapper/parallel.go +++ b/pkg/frontend/querymiddleware/astmapper/parallel.go @@ -34,6 +34,9 @@ var NonParallelFuncs = []string{ "sort", "time", "vector", + + // The following function may be parallelized using a strategy similar to avg(). + "histogram_avg", } // FuncsWithDefaultTimeArg is the list of functions that extract date information from a variadic list of params, diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 87cc485a5e7..5eb46a9ff48 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -474,40 +474,63 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) { ctx := context.Background() cfg := prepareStorageConfig(t) + fixtureDir := filepath.Join("fixtures", "test-query-block-with-ooo-chunks") + storageDir := t.TempDir() + + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + userBkt := bucket.NewUserBucketClient(userID, bkt, nil) - // Generate a single block with 1 series and a lot of samples. seriesWithOutOfOrderChunks := labels.FromStrings("case", "out_of_order", labels.MetricName, metricName) seriesWithOverlappingChunks := labels.FromStrings("case", "overlapping", labels.MetricName, metricName) - specs := []*block.SeriesSpec{ - // Series with out of order chunks. - { - Labels: seriesWithOutOfOrderChunks, - Chunks: []chunks.Meta{ - must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 20, v: 20}, sample{t: 21, v: 21}})), - must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 10, v: 10}, sample{t: 11, v: 11}})), + + // Utility function originally used to generate a block with out of order chunks + // used by this test. The block has been generated commenting out the checks done + // by TSDB block Writer to prevent OOO chunks writing. + _ = func() { + specs := []*block.SeriesSpec{ + // Series with out of order chunks. + { + Labels: seriesWithOutOfOrderChunks, + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 20, v: 20}, sample{t: 21, v: 21}})), + must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 10, v: 10}, sample{t: 11, v: 11}})), + }, }, - }, - // Series with out of order and overlapping chunks. - { - Labels: seriesWithOverlappingChunks, - Chunks: []chunks.Meta{ - must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 20, v: 20}, sample{t: 21, v: 21}})), - must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 10, v: 10}, sample{t: 20, v: 20}})), + // Series with out of order and overlapping chunks. + { + Labels: seriesWithOverlappingChunks, + Chunks: []chunks.Meta{ + must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 20, v: 20}, sample{t: 21, v: 21}})), + must(chunks.ChunkFromSamples([]chunks.Sample{sample{t: 10, v: 10}, sample{t: 20, v: 20}})), + }, }, - }, + } + + _, err := block.GenerateBlockFromSpec(fixtureDir, specs) + require.NoError(t, err) } - storageDir := t.TempDir() - _, err := block.GenerateBlockFromSpec(filepath.Join(storageDir, userID), specs) + // Copy blocks from fixtures dir to the test bucket. + entries, err := os.ReadDir(fixtureDir) require.NoError(t, err) - bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) - require.NoError(t, err) + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + blockID, err := ulid.Parse(entry.Name()) + require.NoErrorf(t, err, "parsing block ID from directory name %q", entry.Name()) + + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), userBkt, filepath.Join(fixtureDir, blockID.String()), nil)) + } + + createBucketIndex(t, bkt, userID) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bkt, defaultLimitsOverrides(t), log.NewNopLogger(), reg) require.NoError(t, err) - createBucketIndex(t, bucket, userID) require.NoError(t, stores.InitialSync(ctx)) tests := map[string]struct { diff --git a/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/chunks/000001 b/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/chunks/000001 new file mode 100644 index 00000000000..56609d7e4f9 Binary files /dev/null and b/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/chunks/000001 differ diff --git a/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/index b/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/index new file mode 100644 index 00000000000..2c080a905a9 Binary files /dev/null and b/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/index differ diff --git a/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/meta.json b/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/meta.json new file mode 100644 index 00000000000..1556ba7b77d --- /dev/null +++ b/pkg/storegateway/fixtures/test-query-block-with-ooo-chunks/01HNWXEBE3GSYWM2BX7FPFDEKT/meta.json @@ -0,0 +1,22 @@ +{ + "ulid": "01HNWXEBE3GSYWM2BX7FPFDEKT", + "minTime": 10, + "maxTime": 22, + "stats": {}, + "compaction": { + "level": 1, + "sources": [ + "01HNWXEBE3GSYWM2BX7FPFDEKT" + ] + }, + "version": 1, + "out_of_order": false, + "thanos": { + "version": 1, + "labels": null, + "downsample": { + "resolution": 0 + }, + "source": "" + } +} diff --git a/vendor/github.com/prometheus/prometheus/promql/engine.go b/vendor/github.com/prometheus/prometheus/promql/engine.go index 49d44632856..1e880410a18 100644 --- a/vendor/github.com/prometheus/prometheus/promql/engine.go +++ b/vendor/github.com/prometheus/prometheus/promql/engine.go @@ -1453,6 +1453,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // Reuse objects across steps to save memory allocations. var floats []FPoint var histograms []HPoint + var prevSS *Series inMatrix := make(Matrix, 1) inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{Out: make(Vector, 0, 1)} @@ -1513,12 +1514,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if len(outVec) > 0 { if outVec[0].H == nil { if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) } + ss.Floats = append(ss.Floats, FPoint{F: outVec[0].F, T: ts}) } else { if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) + ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } ss.Histograms = append(ss.Histograms, HPoint{H: outVec[0].H, T: ts}) } @@ -1527,9 +1529,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio it.ReduceDelta(stepRange) } histSamples := totalHPointSize(ss.Histograms) + if len(ss.Floats)+histSamples > 0 { if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples { mat = append(mat, ss) + prevSS = &mat[len(mat)-1] ev.currentSamples += len(ss.Floats) + histSamples } else { ev.error(ErrTooManySamples(env)) @@ -1679,6 +1683,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } mat := make(Matrix, 0, len(e.Series)) + var prevSS *Series it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator for i, s := range e.Series { @@ -1698,14 +1703,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if ev.currentSamples < ev.maxSamples { if h == nil { if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) } ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) ev.currentSamples++ ev.samplesStats.IncrementSamplesAtStep(step, 1) } else { if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) + ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } point := HPoint{H: h, T: ts} ss.Histograms = append(ss.Histograms, point) @@ -1721,6 +1726,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if len(ss.Floats)+len(ss.Histograms) > 0 { mat = append(mat, ss) + prevSS = &mat[len(mat)-1] } } ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -1841,6 +1847,30 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio panic(fmt.Errorf("unhandled expression of type: %T", expr)) } +// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room. +// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. +func reuseOrGetHPointSlices(prevSS *Series, numSteps int) (r []HPoint) { + if prevSS != nil && cap(prevSS.Histograms)-2*len(prevSS.Histograms) > 0 { + r = prevSS.Histograms[len(prevSS.Histograms):] + prevSS.Histograms = prevSS.Histograms[0:len(prevSS.Histograms):len(prevSS.Histograms)] + return + } + + return getHPointSlice(numSteps) +} + +// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room. +// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. +func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { + if prevSS != nil && cap(prevSS.Floats)-2*len(prevSS.Floats) > 0 { + r = prevSS.Floats[len(prevSS.Floats):] + prevSS.Floats = prevSS.Floats[0:len(prevSS.Floats):len(prevSS.Floats)] + return + } + + return getFPointSlice(numSteps) +} + func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { ws, err := checkAndExpandSeriesSet(ev.ctx, vs) if err != nil { diff --git a/vendor/github.com/prometheus/prometheus/promql/functions.go b/vendor/github.com/prometheus/prometheus/promql/functions.go index a0a118c114b..fe1a5644ec4 100644 --- a/vendor/github.com/prometheus/prometheus/promql/functions.go +++ b/vendor/github.com/prometheus/prometheus/promql/functions.go @@ -1081,6 +1081,23 @@ func funcHistogramSum(vals []parser.Value, args parser.Expressions, enh *EvalNod return enh.Out, nil } +// === histogram_avg(Vector parser.ValueTypeVector) (Vector, Annotations) === +func funcHistogramAvg(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + inVec := vals[0].(Vector) + + for _, sample := range inVec { + // Skip non-histogram samples. + if sample.H == nil { + continue + } + enh.Out = append(enh.Out, Sample{ + Metric: sample.Metric.DropMetricName(), + F: sample.H.Sum / sample.H.Count, + }) + } + return enh.Out, nil +} + // === histogram_stddev(Vector parser.ValueTypeVector) (Vector, Annotations) === func funcHistogramStdDev(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { inVec := vals[0].(Vector) @@ -1532,6 +1549,7 @@ var FunctionCalls = map[string]FunctionCall{ "deriv": funcDeriv, "exp": funcExp, "floor": funcFloor, + "histogram_avg": funcHistogramAvg, "histogram_count": funcHistogramCount, "histogram_fraction": funcHistogramFraction, "histogram_quantile": funcHistogramQuantile, diff --git a/vendor/github.com/prometheus/prometheus/promql/parser/functions.go b/vendor/github.com/prometheus/prometheus/promql/parser/functions.go index 46d50d54761..99b41321fed 100644 --- a/vendor/github.com/prometheus/prometheus/promql/parser/functions.go +++ b/vendor/github.com/prometheus/prometheus/promql/parser/functions.go @@ -167,6 +167,11 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeVector}, ReturnType: ValueTypeVector, }, + "histogram_avg": { + Name: "histogram_avg", + ArgTypes: []ValueType{ValueTypeVector}, + ReturnType: ValueTypeVector, + }, "histogram_count": { Name: "histogram_count", ArgTypes: []ValueType{ValueTypeVector}, diff --git a/vendor/github.com/prometheus/prometheus/promql/testdata/native_histograms.test b/vendor/github.com/prometheus/prometheus/promql/testdata/native_histograms.test index 5f0945d32f5..1da68a385f8 100644 --- a/vendor/github.com/prometheus/prometheus/promql/testdata/native_histograms.test +++ b/vendor/github.com/prometheus/prometheus/promql/testdata/native_histograms.test @@ -11,6 +11,9 @@ eval instant at 5m histogram_count(empty_histogram) eval instant at 5m histogram_sum(empty_histogram) {} 0 +eval instant at 5m histogram_avg(empty_histogram) + {} NaN + eval instant at 5m histogram_fraction(-Inf, +Inf, empty_histogram) {} NaN @@ -31,6 +34,10 @@ eval instant at 5m histogram_count(single_histogram) eval instant at 5m histogram_sum(single_histogram) {} 5 +# histogram_avg calculates the average from sum and count properties. +eval instant at 5m histogram_avg(single_histogram) + {} 1.25 + # We expect half of the values to fall in the range 1 < x <= 2. eval instant at 5m histogram_fraction(1, 2, single_histogram) {} 0.5 @@ -55,6 +62,9 @@ eval instant at 5m histogram_count(multi_histogram) eval instant at 5m histogram_sum(multi_histogram) {} 5 +eval instant at 5m histogram_avg(multi_histogram) + {} 1.25 + eval instant at 5m histogram_fraction(1, 2, multi_histogram) {} 0.5 @@ -69,6 +79,9 @@ eval instant at 50m histogram_count(multi_histogram) eval instant at 50m histogram_sum(multi_histogram) {} 5 +eval instant at 50m histogram_avg(multi_histogram) + {} 1.25 + eval instant at 50m histogram_fraction(1, 2, multi_histogram) {} 0.5 @@ -89,6 +102,9 @@ eval instant at 5m histogram_count(incr_histogram) eval instant at 5m histogram_sum(incr_histogram) {} 6 +eval instant at 5m histogram_avg(incr_histogram) + {} 1.2 + # We expect 3/5ths of the values to fall in the range 1 < x <= 2. eval instant at 5m histogram_fraction(1, 2, incr_histogram) {} 0.6 @@ -106,6 +122,9 @@ eval instant at 50m histogram_count(incr_histogram) eval instant at 50m histogram_sum(incr_histogram) {} 24 +eval instant at 50m histogram_avg(incr_histogram) + {} 1.7142857142857142 + # We expect 12/14ths of the values to fall in the range 1 < x <= 2. eval instant at 50m histogram_fraction(1, 2, incr_histogram) {} 0.8571428571428571 @@ -140,6 +159,9 @@ eval instant at 5m histogram_count(low_res_histogram) eval instant at 5m histogram_sum(low_res_histogram) {} 8 +eval instant at 5m histogram_avg(low_res_histogram) + {} 1.6 + # We expect all values to fall into the lower-resolution bucket with the range 1 < x <= 4. eval instant at 5m histogram_fraction(1, 4, low_res_histogram) {} 1 @@ -157,6 +179,9 @@ eval instant at 5m histogram_count(single_zero_histogram) eval instant at 5m histogram_sum(single_zero_histogram) {} 0.25 +eval instant at 5m histogram_avg(single_zero_histogram) + {} 0.25 + # When only the zero bucket is populated, or there are negative buckets, the distribution is assumed to be equally # distributed around zero; i.e. that there are an equal number of positive and negative observations. Therefore the # entire distribution must lie within the full range of the zero bucket, in this case: -0.5 < x <= +0.5. @@ -179,6 +204,9 @@ eval instant at 5m histogram_count(negative_histogram) eval instant at 5m histogram_sum(negative_histogram) {} -5 +eval instant at 5m histogram_avg(negative_histogram) + {} -1.25 + # We expect half of the values to fall in the range -2 < x <= -1. eval instant at 5m histogram_fraction(-2, -1, negative_histogram) {} 0.5 @@ -199,6 +227,9 @@ eval instant at 10m histogram_count(two_samples_histogram) eval instant at 10m histogram_sum(two_samples_histogram) {} -4 +eval instant at 10m histogram_avg(two_samples_histogram) + {} -1 + eval instant at 10m histogram_fraction(-2, -1, two_samples_histogram) {} 0.5 @@ -217,6 +248,9 @@ eval instant at 5m histogram_count(balanced_histogram) eval instant at 5m histogram_sum(balanced_histogram) {} 0 +eval instant at 5m histogram_avg(balanced_histogram) + {} 0 + eval instant at 5m histogram_fraction(0, 4, balanced_histogram) {} 0.5 diff --git a/vendor/github.com/prometheus/prometheus/rules/alerting.go b/vendor/github.com/prometheus/prometheus/rules/alerting.go index eed1cc3fd15..5c82844ea1e 100644 --- a/vendor/github.com/prometheus/prometheus/rules/alerting.go +++ b/vendor/github.com/prometheus/prometheus/rules/alerting.go @@ -171,9 +171,8 @@ func NewAlertingRule( evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } } diff --git a/vendor/github.com/prometheus/prometheus/rules/dependency.go b/vendor/github.com/prometheus/prometheus/rules/dependency.go deleted file mode 100644 index 02f8b626015..00000000000 --- a/vendor/github.com/prometheus/prometheus/rules/dependency.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rules - -import "github.com/prometheus/prometheus/promql/parser" - -// dependencyMap is a data-structure which contains the relationships between rules within a group. -// It is used to describe the dependency associations between rules in a group whereby one rule uses the -// output metric produced by another rule in its expression (i.e. as its "input"). -type dependencyMap map[Rule][]Rule - -// dependents returns the count of rules which use the output of the given rule as one of their inputs. -func (m dependencyMap) dependents(r Rule) int { - return len(m[r]) -} - -// dependencies returns the count of rules on which the given rule is dependent for input. -func (m dependencyMap) dependencies(r Rule) int { - if len(m) == 0 { - return 0 - } - - var count int - for _, children := range m { - for _, child := range children { - if child == r { - count++ - } - } - } - - return count -} - -// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule -// dependent on its output. -func (m dependencyMap) isIndependent(r Rule) bool { - if m == nil { - return false - } - - return m.dependents(r)+m.dependencies(r) == 0 -} - -// buildDependencyMap builds a data-structure which contains the relationships between rules within a group. -// -// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose -// output an Alert rule depends will not be able to run concurrently. -// -// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be -// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: -// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector -// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE -// -// Rules which are independent can run concurrently with no side-effects. -func buildDependencyMap(rules []Rule) dependencyMap { - dependencies := make(dependencyMap) - - if len(rules) <= 1 { - // No relationships if group has 1 or fewer rules. - return nil - } - - inputs := make(map[string][]Rule, len(rules)) - outputs := make(map[string][]Rule, len(rules)) - - var indeterminate bool - - for _, rule := range rules { - rule := rule - - name := rule.Name() - outputs[name] = append(outputs[name], rule) - - parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error { - if n, ok := node.(*parser.VectorSelector); ok { - // A wildcard metric expression means we cannot reliably determine if this rule depends on any other, - // which means we cannot safely run any rules concurrently. - if n.Name == "" && len(n.LabelMatchers) > 0 { - indeterminate = true - return nil - } - - // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour - // if they run concurrently. - if n.Name == alertMetricName || n.Name == alertForStateMetricName { - indeterminate = true - return nil - } - - inputs[n.Name] = append(inputs[n.Name], rule) - } - return nil - }) - } - - if indeterminate { - return nil - } - - if len(inputs) == 0 || len(outputs) == 0 { - // No relationships can be inferred. - return nil - } - - for output, outRules := range outputs { - for _, outRule := range outRules { - if rs, found := inputs[output]; found && len(rs) > 0 { - dependencies[outRule] = append(dependencies[outRule], rs...) - } - } - } - - return dependencies -} diff --git a/vendor/github.com/prometheus/prometheus/rules/group.go b/vendor/github.com/prometheus/prometheus/rules/group.go index 29afb5e5bf9..751c27aeeab 100644 --- a/vendor/github.com/prometheus/prometheus/rules/group.go +++ b/vendor/github.com/prometheus/prometheus/rules/group.go @@ -22,8 +22,11 @@ import ( "sync" "time" + "go.uber.org/atomic" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/promql/parser" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -45,7 +48,6 @@ type Group struct { name string file string interval time.Duration - evaluationDelay *time.Duration limit int rules []Rule sourceTenants []string @@ -72,6 +74,10 @@ type Group struct { // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc + // concurrencyController controls the rules evaluation concurrency. + concurrencyController RuleConcurrencyController + + evaluationDelay *time.Duration alignEvaluationTimeOnInterval bool } @@ -119,23 +125,30 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } + concurrencyController := o.Opts.RuleConcurrencyController + if concurrencyController == nil { + concurrencyController = sequentialRuleEvalController{} + } + return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, + name: o.Name, + file: o.File, + interval: o.Interval, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + sourceTenants: o.SourceTenants, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + evalIterationFunc: evalIterationFunc, + concurrencyController: concurrencyController, + evaluationDelay: o.EvaluationDelay, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - sourceTenants: o.SourceTenants, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - evalIterationFunc: evalIterationFunc, alignEvaluationTimeOnInterval: o.AlignEvaluationTimeOnInterval, } } @@ -437,9 +450,15 @@ func (g *Group) CopyState(from *Group) { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. +// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal float64 - evaluationDelay := g.EvaluationDelay() + var ( + samplesTotal atomic.Float64 + wg sync.WaitGroup + + evaluationDelay = g.EvaluationDelay() + ) + for i, rule := range g.rules { select { case <-g.done: @@ -447,7 +466,11 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { default: } - func(i int, rule Rule) { + eval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) @@ -483,7 +506,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } rule.SetHealth(HealthGood) rule.SetLastError(nil) - samplesTotal += float64(len(vector)) + samplesTotal.Add(float64(len(vector))) if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) @@ -572,11 +595,25 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - }(i, rule) - } - if g.metrics != nil { - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) + } + + // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. + // Try run concurrently if there are slots available. + if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() { + wg.Add(1) + + go eval(i, rule, func() { + wg.Done() + ctrl.Done() + }) + } else { + eval(i, rule, nil) + } } + + wg.Wait() + + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) g.cleanupStaleSeries(ctx, ts) } @@ -921,3 +958,114 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { return m } + +// dependencyMap is a data-structure which contains the relationships between rules within a group. +// It is used to describe the dependency associations between rules in a group whereby one rule uses the +// output metric produced by another rule in its expression (i.e. as its "input"). +type dependencyMap map[Rule][]Rule + +// dependents returns the count of rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) int { + return len(m[r]) +} + +// dependencies returns the count of rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) int { + if len(m) == 0 { + return 0 + } + + var count int + for _, children := range m { + for _, child := range children { + if child == r { + count++ + } + } + } + + return count +} + +// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule +// dependent on its output. +func (m dependencyMap) isIndependent(r Rule) bool { + if m == nil { + return false + } + + return m.dependents(r)+m.dependencies(r) == 0 +} + +// buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +// +// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose +// output an Alert rule depends will not be able to run concurrently. +// +// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be +// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: +// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector +// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE +// +// Rules which are independent can run concurrently with no side-effects. +func buildDependencyMap(rules []Rule) dependencyMap { + dependencies := make(dependencyMap) + + if len(rules) <= 1 { + // No relationships if group has 1 or fewer rules. + return dependencies + } + + inputs := make(map[string][]Rule, len(rules)) + outputs := make(map[string][]Rule, len(rules)) + + var indeterminate bool + + for _, rule := range rules { + if indeterminate { + break + } + + name := rule.Name() + outputs[name] = append(outputs[name], rule) + + parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error { + if n, ok := node.(*parser.VectorSelector); ok { + // A wildcard metric expression means we cannot reliably determine if this rule depends on any other, + // which means we cannot safely run any rules concurrently. + if n.Name == "" && len(n.LabelMatchers) > 0 { + indeterminate = true + return nil + } + + // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour + // if they run concurrently. + if n.Name == alertMetricName || n.Name == alertForStateMetricName { + indeterminate = true + return nil + } + + inputs[n.Name] = append(inputs[n.Name], rule) + } + return nil + }) + } + + if indeterminate { + return nil + } + + for output, outRules := range outputs { + for _, outRule := range outRules { + if inRules, found := inputs[output]; found && len(inRules) > 0 { + dependencies[outRule] = append(dependencies[outRule], inRules...) + } + } + } + + return dependencies +} + +func isRuleEligibleForConcurrentExecution(rule Rule) bool { + return rule.NoDependentRules() && rule.NoDependencyRules() +} diff --git a/vendor/github.com/prometheus/prometheus/rules/manager.go b/vendor/github.com/prometheus/prometheus/rules/manager.go index b82c1e90781..791510d5586 100644 --- a/vendor/github.com/prometheus/prometheus/rules/manager.go +++ b/vendor/github.com/prometheus/prometheus/rules/manager.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" + "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -105,22 +106,28 @@ type ContextWrapFunc func(ctx context.Context, g *Group) context.Context // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + RuleConcurrencyController RuleConcurrencyController + RuleDependencyController RuleDependencyController + + DefaultEvaluationDelay func() time.Duration + // GroupEvaluationContextFunc will be called to wrap Context based on the group being evaluated. // Will be skipped if nil. GroupEvaluationContextFunc ContextWrapFunc - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader - DefaultEvaluationDelay func() time.Duration // AlwaysRestoreAlertState forces all new or changed groups in calls to Update to restore. // Useful when you know you will be adding alerting rules after the manager has already started. @@ -140,6 +147,18 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } + if o.RuleConcurrencyController == nil { + if o.ConcurrentEvalsEnabled { + o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) + } else { + o.RuleConcurrencyController = sequentialRuleEvalController{} + } + } + + if o.RuleDependencyController == nil { + o.RuleDependencyController = ruleDependencyController{} + } + m := &Manager{ groups: map[string]*Group{}, opts: o, @@ -324,11 +343,7 @@ func (m *Manager) LoadGroups( } // Check dependencies between rules and store it on the Rule itself. - depMap := buildDependencyMap(rules) - for _, r := range rules { - r.SetNoDependentRules(depMap.dependents(r) == 0) - r.SetNoDependencyRules(depMap.dependencies(r) == 0) - } + m.opts.RuleDependencyController.AnalyseRules(rules) groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, @@ -428,3 +443,91 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } } + +// RuleDependencyController controls whether a set of rules have dependencies between each other. +type RuleDependencyController interface { + // AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed + // not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true) + // and/or Rule.SetNoDependencyRules(true). + AnalyseRules(rules []Rule) +} + +type ruleDependencyController struct{} + +// AnalyseRules implements RuleDependencyController. +func (c ruleDependencyController) AnalyseRules(rules []Rule) { + depMap := buildDependencyMap(rules) + for _, r := range rules { + r.SetNoDependentRules(depMap.dependents(r) == 0) + r.SetNoDependencyRules(depMap.dependencies(r) == 0) + } +} + +// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently. +// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus +// server with additional query load. Concurrency is controlled globally, not on a per-group basis. +type RuleConcurrencyController interface { + // Allow determines whether any concurrent evaluation slots are available. + // If Allow() returns true, then Done() must be called to release the acquired slot. + Allow() bool + + // Done releases a concurrent evaluation slot. + Done() +} + +// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. +type concurrentRuleEvalController struct { + sema *semaphore.Weighted + depMapsMu sync.Mutex + depMaps map[*Group]dependencyMap +} + +func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } +} + +func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() + + depMap, found := c.depMaps[g] + if !found { + depMap = buildDependencyMap(g.rules) + c.depMaps[g] = depMap + } + + return depMap.isIndependent(r) +} + +func (c *concurrentRuleEvalController) Allow() bool { + return c.sema.TryAcquire(1) +} + +func (c *concurrentRuleEvalController) Done() { + c.sema.Release(1) +} + +func (c *concurrentRuleEvalController) Invalidate() { + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() + + // Clear out the memoized dependency maps because some or all groups may have been updated. + c.depMaps = map[*Group]dependencyMap{} +} + +// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. +type sequentialRuleEvalController struct{} + +func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool { + return false +} + +func (c sequentialRuleEvalController) Allow() bool { + return false +} + +func (c sequentialRuleEvalController) Done() {} +func (c sequentialRuleEvalController) Invalidate() {} diff --git a/vendor/github.com/prometheus/prometheus/rules/origin.go b/vendor/github.com/prometheus/prometheus/rules/origin.go index 53db5a27d25..695fc5f8382 100644 --- a/vendor/github.com/prometheus/prometheus/rules/origin.go +++ b/vendor/github.com/prometheus/prometheus/rules/origin.go @@ -56,11 +56,10 @@ func NewRuleDetail(r Rule) RuleDetail { } return RuleDetail{ - Name: r.Name(), - Query: r.Query().String(), - Labels: r.Labels(), - Kind: kind, - + Name: r.Name(), + Query: r.Query().String(), + Labels: r.Labels(), + Kind: kind, NoDependentRules: r.NoDependentRules(), NoDependencyRules: r.NoDependencyRules(), } diff --git a/vendor/github.com/prometheus/prometheus/rules/recording.go b/vendor/github.com/prometheus/prometheus/rules/recording.go index 5f535c17276..243b6ebc4bd 100644 --- a/vendor/github.com/prometheus/prometheus/rules/recording.go +++ b/vendor/github.com/prometheus/prometheus/rules/recording.go @@ -56,9 +56,8 @@ func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *Reco evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/client.go b/vendor/github.com/prometheus/prometheus/storage/remote/client.go index fbb68049832..e765b47c3eb 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/client.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/client.go @@ -141,24 +141,24 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { } t := httpClient.Transport + if len(conf.Headers) > 0 { + t = newInjectHeadersRoundTripper(conf.Headers, t) + } + if conf.SigV4Config != nil { - t, err = sigv4.NewSigV4RoundTripper(conf.SigV4Config, httpClient.Transport) + t, err = sigv4.NewSigV4RoundTripper(conf.SigV4Config, t) if err != nil { return nil, err } } if conf.AzureADConfig != nil { - t, err = azuread.NewAzureADRoundTripper(conf.AzureADConfig, httpClient.Transport) + t, err = azuread.NewAzureADRoundTripper(conf.AzureADConfig, t) if err != nil { return nil, err } } - if len(conf.Headers) > 0 { - t = newInjectHeadersRoundTripper(conf.Headers, t) - } - httpClient.Transport = otelhttp.NewTransport(t) return &Client{ diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head.go b/vendor/github.com/prometheus/prometheus/tsdb/head.go index 7b42502dd87..fc9c64c595f 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head.go @@ -1528,9 +1528,9 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match continue } - series.RLock() + series.Lock() t0, t1 := series.minTime(), series.maxTime() - series.RUnlock() + series.Unlock() if t0 == math.MinInt64 || t1 == math.MinInt64 { continue } @@ -2065,7 +2065,7 @@ func (s sample) Type() chunkenc.ValueType { // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { - sync.RWMutex + sync.Mutex ref chunks.HeadSeriesRef lset labels.Labels diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go index 9f058f06384..bd5ef53463a 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go @@ -691,9 +691,9 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, return 0, fmt.Errorf("unknown series when trying to add metadata with HeadSeriesRef: %d and labels: %s", ref, lset) } - s.RLock() + s.Lock() hasNewMetadata := s.meta == nil || *s.meta != meta - s.RUnlock() + s.Unlock() if hasNewMetadata { a.metadata = append(a.metadata, record.RefMetadata{ diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/prometheus/tsdb/index/index.go index da7fdf7b297..750836a850f 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/index.go @@ -51,6 +51,8 @@ const ( FormatV2 = 2 indexFilename = "index" + + seriesByteAlign = 16 ) type indexWriterSeries struct { @@ -145,8 +147,11 @@ type Writer struct { labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. - lastSeries labels.Labels - lastRef storage.SeriesRef + lastSeries labels.Labels + lastSeriesRef storage.SeriesRef + + // Hold last added chunk reference to make sure that chunks are ordered properly. + lastChunkRef chunks.ChunkRef crc32 hash.Hash @@ -432,16 +437,34 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... return fmt.Errorf("out-of-order series added with label set %q", lset) } - if ref < w.lastRef && !w.lastSeries.IsEmpty() { + if ref < w.lastSeriesRef && !w.lastSeries.IsEmpty() { return fmt.Errorf("series with reference greater than %d already added", ref) } + + lastChunkRef := w.lastChunkRef + lastMaxT := int64(0) + for ix, c := range chunks { + if c.Ref < lastChunkRef { + return fmt.Errorf("unsorted chunk reference: %d, previous: %d", c.Ref, lastChunkRef) + } + lastChunkRef = c.Ref + + if ix > 0 && c.MinTime <= lastMaxT { + return fmt.Errorf("chunk minT %d is not higher than previous chunk maxT %d", c.MinTime, lastMaxT) + } + if c.MaxTime < c.MinTime { + return fmt.Errorf("chunk maxT %d is less than minT %d", c.MaxTime, c.MinTime) + } + lastMaxT = c.MaxTime + } + // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. - if err := w.addPadding(16); err != nil { + if err := w.addPadding(seriesByteAlign); err != nil { return fmt.Errorf("failed to write padding bytes: %w", err) } - if w.f.pos%16 != 0 { + if w.f.pos%seriesByteAlign != 0 { return fmt.Errorf("series write not 16-byte aligned at %d", w.f.pos) } @@ -509,7 +532,8 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... } w.lastSeries.CopyFrom(lset) - w.lastRef = ref + w.lastSeriesRef = ref + w.lastChunkRef = lastChunkRef return nil } @@ -712,20 +736,13 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { return err } } + // Write out the length. - w.buf1.Reset() - l := w.f.pos - startPos - 4 - if l > math.MaxUint32 { - return fmt.Errorf("label indexes offset table size exceeds 4 bytes: %d", l) - } - w.buf1.PutBE32int(int(l)) - if err := w.writeAt(w.buf1.Get(), startPos); err != nil { - return err + err := w.writeLengthAndHash(startPos) + if err != nil { + return fmt.Errorf("label indexes offset table length/crc32 write error: %w", err) } - - w.buf1.Reset() - w.buf1.PutHashSum(w.crc32) - return w.write(w.buf1.Get()) + return nil } // writePostingsOffsetTable writes the postings offset table. @@ -793,21 +810,31 @@ func (w *Writer) writePostingsOffsetTable() error { } w.fPO = nil - // Write out the length. + err = w.writeLengthAndHash(startPos) + if err != nil { + return fmt.Errorf("postings offset table length/crc32 write error: %w", err) + } + return nil +} + +func (w *Writer) writeLengthAndHash(startPos uint64) error { w.buf1.Reset() l := w.f.pos - startPos - 4 if l > math.MaxUint32 { - return fmt.Errorf("postings offset table size exceeds 4 bytes: %d", l) + return fmt.Errorf("length size exceeds 4 bytes: %d", l) } w.buf1.PutBE32int(int(l)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { - return err + return fmt.Errorf("write length from buffer error: %w", err) } - // Finally write the hash. + // Write out the hash. w.buf1.Reset() w.buf1.PutHashSum(w.crc32) - return w.write(w.buf1.Get()) + if err := w.write(w.buf1.Get()); err != nil { + return fmt.Errorf("write buffer's crc32 error: %w", err) + } + return nil } const indexTOCLen = 6*8 + crc32.Size @@ -850,10 +877,10 @@ func (w *Writer) writePostingsToTmpFiles() error { for d.Len() > 0 { d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) - if startPos%16 != 0 { + if startPos%seriesByteAlign != 0 { return fmt.Errorf("series not 16-byte aligned at %d", startPos) } - offsets = append(offsets, uint32(startPos/16)) + offsets = append(offsets, uint32(startPos/seriesByteAlign)) // Skip to next series. x := d.Uvarint() d.Skip(x + crc32.Size) @@ -912,7 +939,7 @@ func (w *Writer) writePostingsToTmpFiles() error { if _, ok := postings[lno]; !ok { postings[lno] = map[uint32][]uint32{} } - postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) + postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/seriesByteAlign)) } } // Skip to next series. @@ -949,7 +976,6 @@ func (w *Writer) writePostingsToTmpFiles() error { return w.ctx.Err() default: } - } return nil } @@ -1575,7 +1601,7 @@ func (r *Reader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([ // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. if r.version == FormatV2 { - offset = id * 16 + offset = id * seriesByteAlign } d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) @@ -1614,7 +1640,7 @@ func (r *Reader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. if r.version == FormatV2 { - offset = id * 16 + offset = id * seriesByteAlign } d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) buf := d.Get() @@ -1640,7 +1666,7 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. if r.version == FormatV2 { - offset = id * 16 + offset = id * seriesByteAlign } d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) if d.Err() != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index c97c7107823..c6d1a4af220 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -905,7 +905,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c ## explicit; go 1.20 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1521,7 +1521,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240129161136-6383c86b1da4 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240205112357-84eae046431c # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6