Skip to content

Commit

Permalink
context cancel approach
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Dec 10, 2024
1 parent d0b6f9c commit 5bf18f0
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 56 deletions.
148 changes: 101 additions & 47 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -70,6 +71,8 @@ var (
Name: "compaction_spans_combined_total",
Help: "Number of spans that are deduped per replication factor.",
}, []string{"replication_factor"})

errCompactionJobNoLongerOwned = fmt.Errorf("compaction job no longer owned")
)

func (rw *readerWriter) compactionLoop(ctx context.Context) {
Expand All @@ -81,23 +84,14 @@ func (rw *readerWriter) compactionLoop(ctx context.Context) {
ticker := time.NewTicker(compactionCycle)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
default:
}

select {
case <-ticker.C:
rw.doCompaction(ctx)
case <-ctx.Done():
return
}
doForAtLeast(compactionCycle, func() {
rw.compactOneTenant(ctx)
})
}
}

// doCompaction runs a compaction cycle every 30s
func (rw *readerWriter) doCompaction(ctx context.Context) {
// compactOneTenant runs a compaction cycle every 30s
func (rw *readerWriter) compactOneTenant(ctx context.Context) {
// List of all tenants in the block list
// The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index)
tenants := rw.blocklist.Tenants()
Expand Down Expand Up @@ -142,51 +136,99 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {

start := time.Now()

level.Debug(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
for {
select {
case <-ctx.Done():
// this context is controlled by the service manager. it being cancelled means that the process is shutting down
if ctx.Err() != nil {
level.Info(rw.logger).Log("msg", "caught context cancelled at the top of the compaction loop. bailing.", "err", ctx.Err(), "cause", context.Cause(ctx))
}

// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
toBeCompacted, hashString := blockSelector.BlocksToCompact()
if len(toBeCompacted) == 0 {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)

level.Info(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
return
default:
// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
toBeCompacted, hashString := blockSelector.BlocksToCompact()
if len(toBeCompacted) == 0 {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
}

level.Debug(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
return
}
if !rw.compactorSharder.Owns(hashString) {
// continue on this tenant until we find something we own
continue
}
level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
// Compact selected blocks into a larger one
err := rw.compact(ctx, toBeCompacted, tenantID)

if errors.Is(err, backend.ErrDoesNotExist) {
level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err)
} else if err != nil {
level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
metricCompactionErrors.Inc()
}
owns := func() bool {
return rw.compactorSharder.Owns(hashString)
}
if !owns() {
// continue on this tenant until we find something we own
continue
}

if !rw.compactorSharder.Owns(hashString) {
level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString)
}
level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
err := rw.compactWhileOwns(ctx, toBeCompacted, tenantID, owns)

if errors.Is(err, backend.ErrDoesNotExist) {
level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err)
} else if err != nil {
level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
metricCompactionErrors.Inc()
}

// after a maintenance cycle bail out
if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
// after a maintenance cycle bail out
if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)

level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID)
level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID)
return
}
}
}

func (rw *readerWriter) compactWhileOwns(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, owns func() bool) error {
ownsCtx, cancel := context.WithCancelCause(ctx)

done := make(chan struct{})
defer close(done)

// every second test if we still own the job. if we don't then cancel the context with a cause
// that we can then test for
go func() {
ticker := time.NewTicker(1 * time.Second)

for {
if !owns() {
cancel(errCompactionJobNoLongerOwned)
}

select {
case <-ticker.C:
case <-done:
return
case <-ownsCtx.Done():
return
}
}
}()

err := rw.compactOneJob(ownsCtx, blockMetas, tenantID)
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ownsCtx), errCompactionJobNoLongerOwned) {
level.Warn(rw.logger).Log("msg", "lost ownership of this job. abandoning job and trying again on this block list", "err", err)
return nil
}

// test to see if we still own this job. it would be exceptional to log this message, but would be nice to know. a more likely bad case is that
// job ownership changes but that change has not yet propagated to this compactor, so it duplicated data w/o realizing it.
if !owns() {
// format a string with all input metas
sb := &strings.Builder{}
for _, meta := range blockMetas {
sb.WriteString(meta.BlockID.String())
sb.WriteString(", ")
}

level.Error(rw.logger).Log("msg", "lost ownership of this job after compaction. possible data duplication", "tenant", tenantID, "input_blocks", sb.String())
}

return err
}

func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
func (rw *readerWriter) compactOneJob(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas))

// todo - add timeout?
Expand Down Expand Up @@ -380,3 +422,15 @@ func (i instrumentedObjectCombiner) Combine(dataEncoding string, objs ...[]byte)
}
return b, wasCombined, err
}

// doForAtLeast executes the function f. It blocks for at least the passed duration but can go longer
func doForAtLeast(dur time.Duration, f func()) {
startTime := time.Now()

f()
elapsed := time.Since(startTime)

if elapsed < dur {
time.Sleep(dur - elapsed)
}
}
28 changes: 20 additions & 8 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) {
require.Len(t, blocks, inputBlocks)

compactions++
err := rw.compact(context.Background(), blocks, testTenantID)
err := rw.compactOneJob(context.Background(), blocks, testTenantID)
require.NoError(t, err)

expectedBlockCount -= blocksPerCompaction
Expand Down Expand Up @@ -336,7 +336,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) {
combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0")
require.NoError(t, err)

err = rw.compact(ctx, blocks, testTenantID)
err = rw.compactOneJob(ctx, blocks, testTenantID)
require.NoError(t, err)

checkBlocklists(t, uuid.Nil, 1, blockCount, rw)
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
rw.pollBlocklist()

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
require.NoError(t, err)

// New blocklist contains 1 compacted block with everything
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestCompactionMetrics(t *testing.T) {
assert.NoError(t, err)

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
assert.NoError(t, err)

// Check metric
Expand Down Expand Up @@ -570,12 +570,12 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {

// Verify that tenant 2 compacted, tenant 1 is not
// Compaction starts at index 1 for simplicity
rw.doCompaction(ctx)
rw.compactOneTenant(ctx)
assert.Equal(t, 2, len(rw.blocklist.Metas(testTenantID)))
assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2)))

// Verify both tenants compacted after second run
rw.doCompaction(ctx)
rw.compactOneTenant(ctx)
assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID)))
assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2)))
}
Expand Down Expand Up @@ -643,7 +643,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str
rw.pollBlocklist()

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -778,6 +778,18 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) {
}
}

func TestDoForAtLeast(t *testing.T) {
// test that it runs for at least the duration
start := time.Now()
doForAtLeast(time.Second, func() { time.Sleep(time.Millisecond) })
require.WithinDuration(t, time.Now(), start.Add(time.Second), 10*time.Millisecond)

// test that it allows func to overrun
start = time.Now()
doForAtLeast(time.Second, func() { time.Sleep(2 * time.Second) })
require.WithinDuration(t, time.Now(), start.Add(2*time.Second), 10*time.Millisecond)
}

type testData struct {
id common.ID
t *tempopb.Trace
Expand Down Expand Up @@ -894,6 +906,6 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) {

b.ResetTimer()

err = rw.compact(ctx, metas, testTenantID)
err = rw.compactOneJob(ctx, metas, testTenantID)
require.NoError(b, err)
}
2 changes: 2 additions & 0 deletions tempodb/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

// retentionLoop watches a timer to clean up blocks that are past retention.
// todo: correctly pass context all the way to the backend so a cancelled context can stop the retention loop.
// see implementation of compactionLoop()
func (rw *readerWriter) retentionLoop(ctx context.Context) {
ticker := time.NewTicker(rw.cfg.BlocklistPoll)
for {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestSearchCompactedBlocks(t *testing.T) {
// compact
var blockMetas []*backend.BlockMeta
blockMetas = append(blockMetas, complete.BlockMeta())
require.NoError(t, rw.compact(ctx, blockMetas, testTenantID))
require.NoError(t, rw.compactOneJob(ctx, blockMetas, testTenantID))

// poll
rw.pollBlocklist()
Expand Down

0 comments on commit 5bf18f0

Please sign in to comment.