Skip to content

Commit

Permalink
Revert "first attempt - abandoned"
Browse files Browse the repository at this point in the history
This reverts commit d29f5c0.
  • Loading branch information
joe-elliott committed Dec 9, 2024
1 parent d29f5c0 commit d0b6f9c
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 171 deletions.
9 changes: 4 additions & 5 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ const (
)

var (
ErrDoesNotExist = fmt.Errorf("does not exist")
ErrEmptyTenantID = fmt.Errorf("empty tenant id")
ErrEmptyBlockID = fmt.Errorf("empty block id")
ErrBadSeedFile = fmt.Errorf("bad seed file")
ErrCompactionAbandoned = fmt.Errorf("compaction abandoned b/c we no longer own the job")
ErrDoesNotExist = fmt.Errorf("does not exist")
ErrEmptyTenantID = fmt.Errorf("empty tenant id")
ErrEmptyBlockID = fmt.Errorf("empty block id")
ErrBadSeedFile = fmt.Errorf("bad seed file")

GlobalMaxBlockID = uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff")

Expand Down
19 changes: 5 additions & 14 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,31 +156,23 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
level.Debug(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
return
}

ownsJob := func() bool {
return rw.compactorSharder.Owns(hashString)
}

if !ownsJob() {
if !rw.compactorSharder.Owns(hashString) {
// continue on this tenant until we find something we own
continue
}
level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
// Compact selected blocks into a larger one
err := rw.compact(ctx, toBeCompacted, tenantID, ownsJob)
err := rw.compact(ctx, toBeCompacted, tenantID)

if errors.Is(err, backend.ErrDoesNotExist) {
level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err)
} else if errors.Is(err, backend.ErrCompactionAbandoned) {
level.Warn(rw.logger).Log("msg", "compaction abandoned b/c we no longer own the job", "hashString", hashString)
} else if err != nil {
level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
metricCompactionErrors.Inc()
}

// double check here. this case should be quite exceptional. did we somehow finish the job even though we don't own it?
if !ownsJob() && !errors.Is(err, backend.ErrCompactionAbandoned) {
level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString, "err", err)
if !rw.compactorSharder.Owns(hashString) {
level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString)
}

// after a maintenance cycle bail out
Expand All @@ -194,7 +186,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
}
}

func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, ownsJob func() bool) error {
func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas))

// todo - add timeout?
Expand Down Expand Up @@ -262,7 +254,6 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block
OutputBlocks: outputBlocks,
Combiner: combiner,
MaxBytesPerTrace: rw.compactorOverrides.MaxBytesPerTraceForTenant(tenantID),
Continue: ownsJob,
BytesWritten: func(compactionLevel, bytes int) {
metricCompactionBytesWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(bytes))
},
Expand Down
121 changes: 6 additions & 115 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) {
require.Len(t, blocks, inputBlocks)

compactions++
err := rw.compact(context.Background(), blocks, testTenantID, func() bool { return true })
err := rw.compact(context.Background(), blocks, testTenantID)
require.NoError(t, err)

expectedBlockCount -= blocksPerCompaction
Expand Down Expand Up @@ -336,7 +336,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) {
combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0")
require.NoError(t, err)

err = rw.compact(ctx, blocks, testTenantID, func() bool { return true })
err = rw.compact(ctx, blocks, testTenantID)
require.NoError(t, err)

checkBlocklists(t, uuid.Nil, 1, blockCount, rw)
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
rw.pollBlocklist()

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID, func() bool { return true })
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
require.NoError(t, err)

// New blocklist contains 1 compacted block with everything
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestCompactionMetrics(t *testing.T) {
assert.NoError(t, err)

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID, func() bool { return true })
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
assert.NoError(t, err)

// Check metric
Expand Down Expand Up @@ -643,7 +643,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str
rw.pollBlocklist()

// compact everything
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID, func() bool { return true })
err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -738,8 +738,6 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) {
Combiner: model.StaticCombiner,
MaxBytesPerTrace: 0,

Continue: func() bool { return true },

// hook to drop the trace
DropObject: func(id common.ID) bool {
return bytes.Equal(id, dropID)
Expand Down Expand Up @@ -780,113 +778,6 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) {
}
}

func TestCompactionHaltsAndReturnsError(t *testing.T) {
for _, enc := range encoding.AllEncodings() {
version := enc.Version()
t.Run(version, func(t *testing.T) {
testCompactionHaltsAndReturnsError(t, version)
})
}
}

func testCompactionHaltsAndReturnsError(t *testing.T, targetBlockVersion string) {
tempDir := t.TempDir()

r, w, _, err := New(&Config{
Backend: backend.Local,
Pool: &pool.Config{
MaxWorkers: 10,
QueueDepth: 100,
},
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
Block: &common.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Version: targetBlockVersion,
Encoding: backend.EncSnappy,
IndexPageSizeBytes: 1000,
RowGroupSizeBytes: 30_000_000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, nil, log.NewNopLogger())
require.NoError(t, err)

wal := w.WAL()
require.NoError(t, err)

dec := model.MustNewSegmentDecoder(v1.Encoding)

recordCount := 100
allIDs := make([]common.ID, 0, recordCount)

// write a bunch of dummy data
blockID := backend.NewUUID()
meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: v1.Encoding}
head, err := wal.NewBlock(meta, v1.Encoding)
require.NoError(t, err)

for j := 0; j < recordCount; j++ {
id := test.ValidTraceID(nil)
allIDs = append(allIDs, id)

obj, err := dec.PrepareForWrite(test.MakeTrace(1, id), 0, 0)
require.NoError(t, err)

obj2, err := dec.ToObject([][]byte{obj})
require.NoError(t, err)

err = head.Append(id, obj2, 0, 0)
require.NoError(t, err, "unexpected error writing req")
}

firstBlock, err := w.CompleteBlock(context.Background(), head)
require.NoError(t, err)

// choose a random id to drop
dropID := allIDs[rand.Intn(len(allIDs))]

rw := r.(*readerWriter)
// force compact to a new block
opts := common.CompactionOptions{
BlockConfig: *rw.cfg.Block,
ChunkSizeBytes: DefaultChunkSizeBytes,
FlushSizeBytes: DefaultFlushSizeBytes,
IteratorBufferSize: DefaultIteratorBufferSize,
OutputBlocks: 1,
Combiner: model.StaticCombiner,
MaxBytesPerTrace: 0,

Continue: func() bool { return false }, // ask compaction to stop

// hook to drop the trace
DropObject: func(id common.ID) bool {
return bytes.Equal(id, dropID)
},

// setting to prevent panics.
BytesWritten: func(_, _ int) {},
ObjectsCombined: func(_, _ int) {},
ObjectsWritten: func(_, _ int) {},
SpansDiscarded: func(_, _, _ string, _ int) {},
DisconnectedTrace: func() {},
RootlessTrace: func() {},
}

enc, err := encoding.FromVersion(targetBlockVersion)
require.NoError(t, err)

compactor := enc.NewCompactor(opts)
_, err = compactor.Compact(context.Background(), log.NewNopLogger(), rw.r, rw.w, []*backend.BlockMeta{firstBlock.BlockMeta()})
require.Error(t, err)
require.ErrorIs(t, err, backend.ErrCompactionAbandoned)
}

type testData struct {
id common.ID
t *tempopb.Trace
Expand Down Expand Up @@ -1003,6 +894,6 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) {

b.ResetTimer()

err = rw.compact(ctx, metas, testTenantID, func() bool { return true })
err = rw.compact(ctx, metas, testTenantID)
require.NoError(b, err)
}
4 changes: 0 additions & 4 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ type CompactionOptions struct {
BlockConfig BlockConfig
Combiner model.ObjectCombiner

// Continue is a function that can be used to check if the compaction should continue. This should
// be checked regularly during the compaction process.
Continue func() bool

// DropObject can be used to drop a trace from the compaction process. Currently it only receives the ID
// of the trace to be compacted. If the function returns true, the trace will be dropped.
DropObject func(ID) bool
Expand Down
8 changes: 0 additions & 8 deletions tempodb/encoding/v2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock) (backend.AppendTracker, error) {
compactionLevel := int(block.BlockMeta().CompactionLevel - 1)

if !c.opts.Continue() {
return nil, backend.ErrCompactionAbandoned
}

if c.opts.ObjectsWritten != nil {
c.opts.ObjectsWritten(compactionLevel, block.CurrentBufferedObjects())
}
Expand All @@ -162,10 +158,6 @@ func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker b
func (c *Compactor) finishBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock, l log.Logger) error {
level.Info(l).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta()))

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete(ctx, tracker, w)
if err != nil {
return err
Expand Down
8 changes: 0 additions & 8 deletions tempodb/encoding/vparquet2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,6 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.appendBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

var (
objs = block.CurrentBufferedObjects()
vals = block.EstimatedBufferedBytes()
Expand Down Expand Up @@ -250,10 +246,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.finishBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete()
if err != nil {
return fmt.Errorf("error completing block: %w", err)
Expand Down
8 changes: 0 additions & 8 deletions tempodb/encoding/vparquet3/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,6 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.appendBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

var (
objs = block.CurrentBufferedObjects()
vals = block.EstimatedBufferedBytes()
Expand Down Expand Up @@ -258,10 +254,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.finishBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete()
if err != nil {
return fmt.Errorf("error completing block: %w", err)
Expand Down
8 changes: 0 additions & 8 deletions tempodb/encoding/vparquet4/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.appendBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

var (
objs = block.CurrentBufferedObjects()
vals = block.EstimatedBufferedBytes()
Expand Down Expand Up @@ -262,10 +258,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.finishBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete()
if err != nil {
return fmt.Errorf("error completing block: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestSearchCompactedBlocks(t *testing.T) {
// compact
var blockMetas []*backend.BlockMeta
blockMetas = append(blockMetas, complete.BlockMeta())
require.NoError(t, rw.compact(ctx, blockMetas, testTenantID, func() bool { return true }))
require.NoError(t, rw.compact(ctx, blockMetas, testTenantID))

// poll
rw.pollBlocklist()
Expand Down

0 comments on commit d0b6f9c

Please sign in to comment.