From 5bf18f0e1f6e127bbea619c023f8443fcee4a0d3 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 9 Dec 2024 16:09:20 -0500 Subject: [PATCH] context cancel approach Signed-off-by: Joe Elliott --- tempodb/compactor.go | 148 ++++++++++++++++++++++++++------------ tempodb/compactor_test.go | 28 +++++--- tempodb/retention.go | 2 + tempodb/tempodb_test.go | 2 +- 4 files changed, 124 insertions(+), 56 deletions(-) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 517844cf126..ddbbc906507 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -6,6 +6,7 @@ import ( "fmt" "sort" "strconv" + "strings" "time" "github.com/go-kit/log/level" @@ -70,6 +71,8 @@ var ( Name: "compaction_spans_combined_total", Help: "Number of spans that are deduped per replication factor.", }, []string{"replication_factor"}) + + errCompactionJobNoLongerOwned = fmt.Errorf("compaction job no longer owned") ) func (rw *readerWriter) compactionLoop(ctx context.Context) { @@ -81,23 +84,14 @@ func (rw *readerWriter) compactionLoop(ctx context.Context) { ticker := time.NewTicker(compactionCycle) defer ticker.Stop() for { - select { - case <-ctx.Done(): - return - default: - } - - select { - case <-ticker.C: - rw.doCompaction(ctx) - case <-ctx.Done(): - return - } + doForAtLeast(compactionCycle, func() { + rw.compactOneTenant(ctx) + }) } } -// doCompaction runs a compaction cycle every 30s -func (rw *readerWriter) doCompaction(ctx context.Context) { +// compactOneTenant runs a compaction cycle every 30s +func (rw *readerWriter) compactOneTenant(ctx context.Context) { // List of all tenants in the block list // The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index) tenants := rw.blocklist.Tenants() @@ -142,51 +136,99 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { start := time.Now() - level.Debug(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset) + level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset) for { - select { - case <-ctx.Done(): + // this context is controlled by the service manager. it being cancelled means that the process is shutting down + if ctx.Err() != nil { + level.Info(rw.logger).Log("msg", "caught context cancelled at the top of the compaction loop. bailing.", "err", ctx.Err(), "cause", context.Cause(ctx)) + } + + // Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one + toBeCompacted, hashString := blockSelector.BlocksToCompact() + if len(toBeCompacted) == 0 { + measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns) + + level.Info(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID) return - default: - // Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one - toBeCompacted, hashString := blockSelector.BlocksToCompact() - if len(toBeCompacted) == 0 { - measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns) + } - level.Debug(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID) - return - } - if !rw.compactorSharder.Owns(hashString) { - // continue on this tenant until we find something we own - continue - } - level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString) - // Compact selected blocks into a larger one - err := rw.compact(ctx, toBeCompacted, tenantID) - - if errors.Is(err, backend.ErrDoesNotExist) { - level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err) - } else if err != nil { - level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err) - metricCompactionErrors.Inc() - } + owns := func() bool { + return rw.compactorSharder.Owns(hashString) + } + if !owns() { + // continue on this tenant until we find something we own + continue + } - if !rw.compactorSharder.Owns(hashString) { - level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString) - } + level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString) + err := rw.compactWhileOwns(ctx, toBeCompacted, tenantID, owns) + + if errors.Is(err, backend.ErrDoesNotExist) { + level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err) + } else if err != nil { + level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err) + metricCompactionErrors.Inc() + } - // after a maintenance cycle bail out - if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) { - measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns) + // after a maintenance cycle bail out + if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) { + measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns) - level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID) + level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID) + return + } + } +} + +func (rw *readerWriter) compactWhileOwns(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, owns func() bool) error { + ownsCtx, cancel := context.WithCancelCause(ctx) + + done := make(chan struct{}) + defer close(done) + + // every second test if we still own the job. if we don't then cancel the context with a cause + // that we can then test for + go func() { + ticker := time.NewTicker(1 * time.Second) + + for { + if !owns() { + cancel(errCompactionJobNoLongerOwned) + } + + select { + case <-ticker.C: + case <-done: + return + case <-ownsCtx.Done(): return } } + }() + + err := rw.compactOneJob(ownsCtx, blockMetas, tenantID) + if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ownsCtx), errCompactionJobNoLongerOwned) { + level.Warn(rw.logger).Log("msg", "lost ownership of this job. abandoning job and trying again on this block list", "err", err) + return nil } + + // test to see if we still own this job. it would be exceptional to log this message, but would be nice to know. a more likely bad case is that + // job ownership changes but that change has not yet propagated to this compactor, so it duplicated data w/o realizing it. + if !owns() { + // format a string with all input metas + sb := &strings.Builder{} + for _, meta := range blockMetas { + sb.WriteString(meta.BlockID.String()) + sb.WriteString(", ") + } + + level.Error(rw.logger).Log("msg", "lost ownership of this job after compaction. possible data duplication", "tenant", tenantID, "input_blocks", sb.String()) + } + + return err } -func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error { +func (rw *readerWriter) compactOneJob(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error { level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas)) // todo - add timeout? @@ -380,3 +422,15 @@ func (i instrumentedObjectCombiner) Combine(dataEncoding string, objs ...[]byte) } return b, wasCombined, err } + +// doForAtLeast executes the function f. It blocks for at least the passed duration but can go longer +func doForAtLeast(dur time.Duration, f func()) { + startTime := time.Now() + + f() + elapsed := time.Since(startTime) + + if elapsed < dur { + time.Sleep(dur - elapsed) + } +} diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 0cf76ee4230..c9175b20ee8 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -173,7 +173,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) { require.Len(t, blocks, inputBlocks) compactions++ - err := rw.compact(context.Background(), blocks, testTenantID) + err := rw.compactOneJob(context.Background(), blocks, testTenantID) require.NoError(t, err) expectedBlockCount -= blocksPerCompaction @@ -336,7 +336,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) { combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0") require.NoError(t, err) - err = rw.compact(ctx, blocks, testTenantID) + err = rw.compactOneJob(ctx, blocks, testTenantID) require.NoError(t, err) checkBlocklists(t, uuid.Nil, 1, blockCount, rw) @@ -420,7 +420,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { rw.pollBlocklist() // compact everything - err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID) + err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID) require.NoError(t, err) // New blocklist contains 1 compacted block with everything @@ -501,7 +501,7 @@ func TestCompactionMetrics(t *testing.T) { assert.NoError(t, err) // compact everything - err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID) + err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID) assert.NoError(t, err) // Check metric @@ -570,12 +570,12 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { // Verify that tenant 2 compacted, tenant 1 is not // Compaction starts at index 1 for simplicity - rw.doCompaction(ctx) + rw.compactOneTenant(ctx) assert.Equal(t, 2, len(rw.blocklist.Metas(testTenantID))) assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2))) // Verify both tenants compacted after second run - rw.doCompaction(ctx) + rw.compactOneTenant(ctx) assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID))) assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2))) } @@ -643,7 +643,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str rw.pollBlocklist() // compact everything - err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID) + err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID) require.NoError(t, err) time.Sleep(100 * time.Millisecond) @@ -778,6 +778,18 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) { } } +func TestDoForAtLeast(t *testing.T) { + // test that it runs for at least the duration + start := time.Now() + doForAtLeast(time.Second, func() { time.Sleep(time.Millisecond) }) + require.WithinDuration(t, time.Now(), start.Add(time.Second), 10*time.Millisecond) + + // test that it allows func to overrun + start = time.Now() + doForAtLeast(time.Second, func() { time.Sleep(2 * time.Second) }) + require.WithinDuration(t, time.Now(), start.Add(2*time.Second), 10*time.Millisecond) +} + type testData struct { id common.ID t *tempopb.Trace @@ -894,6 +906,6 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) { b.ResetTimer() - err = rw.compact(ctx, metas, testTenantID) + err = rw.compactOneJob(ctx, metas, testTenantID) require.NoError(b, err) } diff --git a/tempodb/retention.go b/tempodb/retention.go index fd7fd667cfd..bffcafadefa 100644 --- a/tempodb/retention.go +++ b/tempodb/retention.go @@ -12,6 +12,8 @@ import ( ) // retentionLoop watches a timer to clean up blocks that are past retention. +// todo: correctly pass context all the way to the backend so a cancelled context can stop the retention loop. +// see implementation of compactionLoop() func (rw *readerWriter) retentionLoop(ctx context.Context) { ticker := time.NewTicker(rw.cfg.BlocklistPoll) for { diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index d47374f9047..0d993a675aa 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -547,7 +547,7 @@ func TestSearchCompactedBlocks(t *testing.T) { // compact var blockMetas []*backend.BlockMeta blockMetas = append(blockMetas, complete.BlockMeta()) - require.NoError(t, rw.compact(ctx, blockMetas, testTenantID)) + require.NoError(t, rw.compactOneJob(ctx, blockMetas, testTenantID)) // poll rw.pollBlocklist()