Skip to content

Commit

Permalink
Compactor: export estimated number of compaction jobs based on bucket…
Browse files Browse the repository at this point in the history
…-index (#7299)

* Compute number of compaction jobs from bucket index and export it via cortex_bucket_index_compaction_jobs metric.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Add PR number.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Remove unused parameter name.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Make linter happy.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix tests.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

---------

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany authored Feb 5, 2024
1 parent fe3bc0f commit b08086a
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131
* [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146
* [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262 #7267
* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_estimated_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_estimated_compaction_jobs_errors_total` is updated instead. #7299
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
* [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766
Expand Down
125 changes: 113 additions & 12 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type BlocksCleanerConfig struct {
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
DeleteBlocksConcurrency int
NoBlocksFileCleanupEnabled bool
CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs.
}

type BlocksCleaner struct {
Expand All @@ -60,18 +61,20 @@ type BlocksCleaner struct {
lastOwnedUsers []string

// Metrics.
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
partialBlocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantMarkedBlocks *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
partialBlocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantMarkedBlocks *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
bucketIndexCompactionJobs *prometheus.GaugeVec
bucketIndexCompactionPlanningErrors prometheus.Counter
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
Expand Down Expand Up @@ -137,6 +140,15 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, own
Name: "cortex_bucket_index_last_successful_update_timestamp_seconds",
Help: "Timestamp of the last successful update of a tenant's bucket index.",
}, []string{"user"}),

bucketIndexCompactionJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_index_estimated_compaction_jobs",
Help: "Estimated number of compaction jobs based on latest version of bucket index.",
}, []string{"user", "type"}),
bucketIndexCompactionPlanningErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_bucket_index_estimated_compaction_jobs_errors_total",
Help: "Total number of failed executions of compaction job estimation based on latest version of bucket index.",
}),
}

c.Service = services.NewTimerService(cfg.CleanupInterval, c.starting, c.ticker, c.stopping)
Expand Down Expand Up @@ -231,6 +243,8 @@ func (c *BlocksCleaner) refreshOwnedUsers(ctx context.Context) ([]string, map[st
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))
}
}
c.lastOwnedUsers = allUsers
Expand Down Expand Up @@ -337,6 +351,8 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))

if deletedBlocks > 0 {
level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks)
Expand Down Expand Up @@ -457,6 +473,21 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()

// Compute pending compaction jobs based on current index.
splitJobs, mergeJobs, err := c.estimateCompactionJobsFrom(ctx, userID, userBucket, idx)
if err != nil {
// When compactor is shutting down, we get context cancellation. There's no reason to report that as error.
if !errors.Is(err, context.Canceled) {
level.Error(userLogger).Log("msg", "failed to compute compaction jobs from bucket index for user", "err", err)
c.bucketIndexCompactionPlanningErrors.Inc()
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))
}
} else {
c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs))
c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageMerge)).Set(float64(mergeJobs))
}

return nil
}

Expand Down Expand Up @@ -638,3 +669,73 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u
}
return lastModified, err
}

func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) {
metas := convertBucketIndexToMetasForCompactionJobPlanning(idx)

// We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner.
synced := newNoopGaugeVec()

for _, f := range []block.MetadataFilter{
// We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex.
// We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction.
NewNoCompactionMarkFilter(userBucket, true),
} {
err := f.Filter(ctx, metas, synced)
if err != nil {
return 0, 0, err
}
}

grouper := NewSplitAndMergeGrouper(userID, c.cfg.CompactionBlockRanges.ToMilliseconds(), uint32(c.cfgProvider.CompactorSplitAndMergeShards(userID)), uint32(c.cfgProvider.CompactorSplitGroups(userID)), log.NewNopLogger())
jobs, err := grouper.Groups(metas)
if err != nil {
return 0, 0, err
}

split := 0
merge := 0
for _, j := range jobs {
if j.UseSplitting() {
split++
} else {
merge++
}
}

return split, merge, nil
}

// Convert index into map of block Metas, but ignore blocks marked for deletion.
func convertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta {
deletedULIDs := idx.BlockDeletionMarks.GetULIDs()
deleted := make(map[ulid.ULID]bool, len(deletedULIDs))
for _, id := range deletedULIDs {
deleted[id] = true
}

metas := map[ulid.ULID]*block.Meta{}
for _, b := range idx.Blocks {
if deleted[b.ID] {
continue
}
metas[b.ID] = b.ThanosMeta()
if metas[b.ID].Thanos.Labels == nil {
metas[b.ID].Thanos.Labels = map[string]string{}
}
metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID // Needed for correct planning.
}
return metas
}

type noopGaugeVec struct {
g prometheus.Gauge
}

func newNoopGaugeVec() *noopGaugeVec {
return &noopGaugeVec{g: promauto.With(nil).NewGauge(prometheus.GaugeOpts{})}
}

func (n *noopGaugeVec) WithLabelValues(...string) prometheus.Gauge {
return n.g
}
73 changes: 73 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -203,10 +204,17 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 2
cortex_bucket_blocks_partials_count{user="user-2"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_estimated_compaction_jobs",
))
}

Expand Down Expand Up @@ -371,10 +379,17 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 0
cortex_bucket_blocks_partials_count{user="user-2"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_estimated_compaction_jobs",
))

// Override the users scanner to reconfigure it to only return a subset of users.
Expand All @@ -396,10 +411,15 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_estimated_compaction_jobs",
))
}

Expand Down Expand Up @@ -1011,6 +1031,59 @@ func TestStalePartialBlockLastModifiedTime(t *testing.T) {
}
}

func TestComputeCompactionJobs(t *testing.T) {
bucketClient, _ := mimir_testutil.PrepareFilesystemBucket(t)
bucketClient = block.BucketWithGlobalMarkers(bucketClient)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour},
}

const user = "test"

cfgProvider := newMockConfigProvider()
cfgProvider.splitGroups[user] = 0 // No grouping of jobs for split-compaction. All jobs will be in single split compaction.
cfgProvider.splitAndMergeShards[user] = 3

twoHoursMS := 2 * time.Hour.Milliseconds()
dayMS := 24 * time.Hour.Milliseconds()

blockMarkedForNoCompact := ulid.MustNew(ulid.Now(), rand.Reader)

index := bucketindex.Index{}
index.Blocks = bucketindex.Blocks{
// Some 2h blocks that should be compacted together and split.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},

// Some merge jobs.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},

&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},

// This merge job is skipped, as block is marked for no-compaction.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
&bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
}

userBucket := bucket.NewUserBucketClient(user, bucketClient, nil)
// Mark block for no-compaction.
require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil)
split, merge, err := cleaner.estimateCompactionJobsFrom(context.Background(), user, userBucket, &index)
require.NoError(t, err)
require.Equal(t, 1, split)
require.Equal(t, 2, merge)
}

type mockBucketFailure struct {
objstore.Bucket

Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled,
CompactionBlockRanges: c.compactorCfg.BlockRanges,
}, c.bucketClient, c.shardingStrategy.blocksCleanerOwnUser, c.cfgProvider, c.parentLogger, c.registerer)

// Start blocks cleaner asynchronously, don't wait until initial cleanup is finished.
Expand Down

0 comments on commit b08086a

Please sign in to comment.