Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Halt compaction if job is lost #4420

Merged
merged 8 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ querier:
* [BUGFIX] Initialize histogram buckets to 0 to avoid downsampling. [#4366](https://github.com/grafana/tempo/pull/4366) (@javiermolinar)
* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [BUGFIX] Fixed an issue in the generator where the first batch was counted 2x against a traces size. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott)
* [BUGFIX] Unstable compactors can occassionally duplicate data. Check for job ownership during compaction and cancel a job if ownership changes. [#4420](https://github.com/grafana/tempo/pull/4420) (@joe-elliott)

# v2.6.1

Expand Down
156 changes: 110 additions & 46 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -70,6 +71,8 @@ var (
Name: "compaction_spans_combined_total",
Help: "Number of spans that are deduped per replication factor.",
}, []string{"replication_factor"})

errCompactionJobNoLongerOwned = fmt.Errorf("compaction job no longer owned")
)

func (rw *readerWriter) compactionLoop(ctx context.Context) {
Expand All @@ -81,23 +84,14 @@ func (rw *readerWriter) compactionLoop(ctx context.Context) {
ticker := time.NewTicker(compactionCycle)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
default:
}

select {
case <-ticker.C:
rw.doCompaction(ctx)
case <-ctx.Done():
return
}
doForAtLeast(ctx, compactionCycle, func() {
rw.compactOneTenant(ctx)
})
}
}

// doCompaction runs a compaction cycle every 30s
func (rw *readerWriter) doCompaction(ctx context.Context) {
// compactOneTenant runs a compaction cycle every 30s
func (rw *readerWriter) compactOneTenant(ctx context.Context) {
// List of all tenants in the block list
// The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index)
tenants := rw.blocklist.Tenants()
Expand Down Expand Up @@ -142,51 +136,102 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {

start := time.Now()

level.Debug(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
for {
select {
case <-ctx.Done():
// this context is controlled by the service manager. it being cancelled means that the process is shutting down
if ctx.Err() != nil {
level.Info(rw.logger).Log("msg", "caught context cancelled at the top of the compaction loop. bailing.", "err", ctx.Err(), "cause", context.Cause(ctx))
zalegrala marked this conversation as resolved.
Show resolved Hide resolved
return
}

// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
toBeCompacted, hashString := blockSelector.BlocksToCompact()
if len(toBeCompacted) == 0 {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)

level.Info(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
return
}

owns := func() bool {
return rw.compactorSharder.Owns(hashString)
}
if !owns() {
// continue on this tenant until we find something we own
continue
}

level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
err := rw.compactWhileOwns(ctx, toBeCompacted, tenantID, owns)

if errors.Is(err, backend.ErrDoesNotExist) {
level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err)
} else if err != nil {
level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
metricCompactionErrors.Inc()
}

// after a maintenance cycle bail out
if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)

level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID)
return
default:
// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
toBeCompacted, hashString := blockSelector.BlocksToCompact()
if len(toBeCompacted) == 0 {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
}
}
}

func (rw *readerWriter) compactWhileOwns(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, owns func() bool) error {
ownsCtx, cancel := context.WithCancelCause(ctx)

done := make(chan struct{})
defer close(done)

// every second test if we still own the job. if we don't then cancel the context with a cause
// that we can then test for
go func() {
ticker := time.NewTicker(1 * time.Second)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now it appears this 1 second is in conflict with the compactionCycle. If we have a compactionCycle less than 1 second, it means we could continue compaction for one extra than desired. Is this a concern?

Copy link
Member Author

@joe-elliott joe-elliott Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't block the return of the function so it won't stop the compaction. if a compaction takes < 1s then compactWhileOwns will still return successfully and some short amount of time later this goroutine will exit.

defer ticker.Stop()

level.Debug(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
for {
if !owns() {
cancel(errCompactionJobNoLongerOwned)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
return
}
if !rw.compactorSharder.Owns(hashString) {
// continue on this tenant until we find something we own
continue
}
level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
// Compact selected blocks into a larger one
err := rw.compact(ctx, toBeCompacted, tenantID)

if errors.Is(err, backend.ErrDoesNotExist) {
level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err)
} else if err != nil {
level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
metricCompactionErrors.Inc()
}

if !rw.compactorSharder.Owns(hashString) {
level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString)
select {
case <-ticker.C:
case <-done:
return
case <-ownsCtx.Done():
return
}
}
}()

// after a maintenance cycle bail out
if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
err := rw.compactOneJob(ownsCtx, blockMetas, tenantID)
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ownsCtx), errCompactionJobNoLongerOwned) {
level.Warn(rw.logger).Log("msg", "lost ownership of this job. abandoning job and trying again on this block list", "err", err)
return nil
}

level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID)
return
}
// test to see if we still own this job. it would be exceptional to log this message, but would be nice to know. a more likely bad case is that
// job ownership changes but that change has not yet propagated to this compactor, so it duplicated data w/o realizing it.
if !owns() {
// format a string with all input metas
sb := &strings.Builder{}
for _, meta := range blockMetas {
sb.WriteString(meta.BlockID.String())
sb.WriteString(", ")
}

level.Error(rw.logger).Log("msg", "lost ownership of this job after compaction. possible data duplication", "tenant", tenantID, "input_blocks", sb.String())
}

return err
}

func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
func (rw *readerWriter) compactOneJob(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas))

// todo - add timeout?
Expand Down Expand Up @@ -380,3 +425,22 @@ func (i instrumentedObjectCombiner) Combine(dataEncoding string, objs ...[]byte)
}
return b, wasCombined, err
}

// doForAtLeast executes the function f. It blocks for at least the passed duration but can go longer. if context is cancelled after
// the function is done we will bail immediately. in the current use case this means that the process is shutting down
// we don't force f() to cancel, we assume it also responds to the cancelled context
func doForAtLeast(ctx context.Context, dur time.Duration, f func()) {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
startTime := time.Now()
f()
elapsed := time.Since(startTime)

if elapsed < dur {
ticker := time.NewTicker(dur - elapsed)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

select {
case <-ticker.C:
case <-ctx.Done():
zalegrala marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
39 changes: 31 additions & 8 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) {
require.Len(t, blocks, inputBlocks)

compactions++
err := rw.compact(context.Background(), blocks, testTenantID)
err := rw.compactOneJob(context.Background(), blocks, testTenantID)
require.NoError(t, err)

expectedBlockCount -= blocksPerCompaction
Expand Down Expand Up @@ -336,7 +336,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) {
combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0")
require.NoError(t, err)

err = rw.compact(ctx, blocks, testTenantID)
err = rw.compactOneJob(ctx, blocks, testTenantID)
require.NoError(t, err)

checkBlocklists(t, uuid.Nil, 1, blockCount, rw)
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
rw.pollBlocklist()

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
require.NoError(t, err)

// New blocklist contains 1 compacted block with everything
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestCompactionMetrics(t *testing.T) {
assert.NoError(t, err)

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
assert.NoError(t, err)

// Check metric
Expand Down Expand Up @@ -570,12 +570,12 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {

// Verify that tenant 2 compacted, tenant 1 is not
// Compaction starts at index 1 for simplicity
rw.doCompaction(ctx)
rw.compactOneTenant(ctx)
assert.Equal(t, 2, len(rw.blocklist.Metas(testTenantID)))
assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2)))

// Verify both tenants compacted after second run
rw.doCompaction(ctx)
rw.compactOneTenant(ctx)
assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID)))
assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2)))
}
Expand Down Expand Up @@ -643,7 +643,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str
rw.pollBlocklist()

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -778,6 +778,29 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) {
}
}

func TestDoForAtLeast(t *testing.T) {
// test that it runs for at least the duration
start := time.Now()
doForAtLeast(context.Background(), time.Second, func() { time.Sleep(time.Millisecond) })
require.WithinDuration(t, time.Now(), start.Add(time.Second), 100*time.Millisecond)

// test that it allows func to overrun
start = time.Now()
doForAtLeast(context.Background(), time.Second, func() { time.Sleep(2 * time.Second) })
require.WithinDuration(t, time.Now(), start.Add(2*time.Second), 100*time.Millisecond)

// make sure cancelling the context stops the function if the function is complete and we're
// just waiting. it is presumed, but not enforced that the function responds to a cancelled contxt
ctx, cancel := context.WithCancel(context.Background())
start = time.Now()
go func() {
time.Sleep(time.Second)
cancel()
}()
doForAtLeast(ctx, 2*time.Second, func() {})
require.WithinDuration(t, time.Now(), start.Add(time.Second), 100*time.Millisecond)
}

type testData struct {
id common.ID
t *tempopb.Trace
Expand Down Expand Up @@ -894,6 +917,6 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) {

b.ResetTimer()

err = rw.compact(ctx, metas, testTenantID)
err = rw.compactOneJob(ctx, metas, testTenantID)
require.NoError(b, err)
}
2 changes: 2 additions & 0 deletions tempodb/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

// retentionLoop watches a timer to clean up blocks that are past retention.
// todo: correctly pass context all the way to the backend so a cancelled context can stop the retention loop.
// see implementation of compactionLoop()
func (rw *readerWriter) retentionLoop(ctx context.Context) {
ticker := time.NewTicker(rw.cfg.BlocklistPoll)
for {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestSearchCompactedBlocks(t *testing.T) {
// compact
var blockMetas []*backend.BlockMeta
blockMetas = append(blockMetas, complete.BlockMeta())
require.NoError(t, rw.compact(ctx, blockMetas, testTenantID))
require.NoError(t, rw.compactOneJob(ctx, blockMetas, testTenantID))

// poll
rw.pollBlocklist()
Expand Down
Loading