From d0b6f9ccc3be412d32f2707150e14215ac8cd91e Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 9 Dec 2024 15:53:50 -0500 Subject: [PATCH] Revert "first attempt - abandoned" This reverts commit d29f5c05df903651601968504fb79c5bfe053c9f. --- tempodb/backend/backend.go | 9 +- tempodb/compactor.go | 19 +--- tempodb/compactor_test.go | 121 ++---------------------- tempodb/encoding/common/interfaces.go | 4 - tempodb/encoding/v2/compactor.go | 8 -- tempodb/encoding/vparquet2/compactor.go | 8 -- tempodb/encoding/vparquet3/compactor.go | 8 -- tempodb/encoding/vparquet4/compactor.go | 8 -- tempodb/tempodb_test.go | 2 +- 9 files changed, 16 insertions(+), 171 deletions(-) diff --git a/tempodb/backend/backend.go b/tempodb/backend/backend.go index 2cd5e0ae7c9..93a1f006d47 100644 --- a/tempodb/backend/backend.go +++ b/tempodb/backend/backend.go @@ -19,11 +19,10 @@ const ( ) var ( - ErrDoesNotExist = fmt.Errorf("does not exist") - ErrEmptyTenantID = fmt.Errorf("empty tenant id") - ErrEmptyBlockID = fmt.Errorf("empty block id") - ErrBadSeedFile = fmt.Errorf("bad seed file") - ErrCompactionAbandoned = fmt.Errorf("compaction abandoned b/c we no longer own the job") + ErrDoesNotExist = fmt.Errorf("does not exist") + ErrEmptyTenantID = fmt.Errorf("empty tenant id") + ErrEmptyBlockID = fmt.Errorf("empty block id") + ErrBadSeedFile = fmt.Errorf("bad seed file") GlobalMaxBlockID = uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff") diff --git a/tempodb/compactor.go b/tempodb/compactor.go index f94ba57f8ca..517844cf126 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -156,31 +156,23 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { level.Debug(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID) return } - - ownsJob := func() bool { - return rw.compactorSharder.Owns(hashString) - } - - if !ownsJob() { + if !rw.compactorSharder.Owns(hashString) { // continue on this tenant until we find something we own continue } level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString) // Compact selected blocks into a larger one - err := rw.compact(ctx, toBeCompacted, tenantID, ownsJob) + err := rw.compact(ctx, toBeCompacted, tenantID) if errors.Is(err, backend.ErrDoesNotExist) { level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err) - } else if errors.Is(err, backend.ErrCompactionAbandoned) { - level.Warn(rw.logger).Log("msg", "compaction abandoned b/c we no longer own the job", "hashString", hashString) } else if err != nil { level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err) metricCompactionErrors.Inc() } - // double check here. this case should be quite exceptional. did we somehow finish the job even though we don't own it? - if !ownsJob() && !errors.Is(err, backend.ErrCompactionAbandoned) { - level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString, "err", err) + if !rw.compactorSharder.Owns(hashString) { + level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString) } // after a maintenance cycle bail out @@ -194,7 +186,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { } } -func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, ownsJob func() bool) error { +func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error { level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas)) // todo - add timeout? @@ -262,7 +254,6 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block OutputBlocks: outputBlocks, Combiner: combiner, MaxBytesPerTrace: rw.compactorOverrides.MaxBytesPerTraceForTenant(tenantID), - Continue: ownsJob, BytesWritten: func(compactionLevel, bytes int) { metricCompactionBytesWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(bytes)) }, diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 4c9a8bfcd31..0cf76ee4230 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -173,7 +173,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) { require.Len(t, blocks, inputBlocks) compactions++ - err := rw.compact(context.Background(), blocks, testTenantID, func() bool { return true }) + err := rw.compact(context.Background(), blocks, testTenantID) require.NoError(t, err) expectedBlockCount -= blocksPerCompaction @@ -336,7 +336,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) { combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0") require.NoError(t, err) - err = rw.compact(ctx, blocks, testTenantID, func() bool { return true }) + err = rw.compact(ctx, blocks, testTenantID) require.NoError(t, err) checkBlocklists(t, uuid.Nil, 1, blockCount, rw) @@ -420,7 +420,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { rw.pollBlocklist() // compact everything - err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID, func() bool { return true }) + err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID) require.NoError(t, err) // New blocklist contains 1 compacted block with everything @@ -501,7 +501,7 @@ func TestCompactionMetrics(t *testing.T) { assert.NoError(t, err) // compact everything - err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID, func() bool { return true }) + err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID) assert.NoError(t, err) // Check metric @@ -643,7 +643,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str rw.pollBlocklist() // compact everything - err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID, func() bool { return true }) + err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID) require.NoError(t, err) time.Sleep(100 * time.Millisecond) @@ -738,8 +738,6 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) { Combiner: model.StaticCombiner, MaxBytesPerTrace: 0, - Continue: func() bool { return true }, - // hook to drop the trace DropObject: func(id common.ID) bool { return bytes.Equal(id, dropID) @@ -780,113 +778,6 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) { } } -func TestCompactionHaltsAndReturnsError(t *testing.T) { - for _, enc := range encoding.AllEncodings() { - version := enc.Version() - t.Run(version, func(t *testing.T) { - testCompactionHaltsAndReturnsError(t, version) - }) - } -} - -func testCompactionHaltsAndReturnsError(t *testing.T, targetBlockVersion string) { - tempDir := t.TempDir() - - r, w, _, err := New(&Config{ - Backend: backend.Local, - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &common.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFP: .01, - BloomShardSizeBytes: 100_000, - Version: targetBlockVersion, - Encoding: backend.EncSnappy, - IndexPageSizeBytes: 1000, - RowGroupSizeBytes: 30_000_000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, nil, log.NewNopLogger()) - require.NoError(t, err) - - wal := w.WAL() - require.NoError(t, err) - - dec := model.MustNewSegmentDecoder(v1.Encoding) - - recordCount := 100 - allIDs := make([]common.ID, 0, recordCount) - - // write a bunch of dummy data - blockID := backend.NewUUID() - meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: v1.Encoding} - head, err := wal.NewBlock(meta, v1.Encoding) - require.NoError(t, err) - - for j := 0; j < recordCount; j++ { - id := test.ValidTraceID(nil) - allIDs = append(allIDs, id) - - obj, err := dec.PrepareForWrite(test.MakeTrace(1, id), 0, 0) - require.NoError(t, err) - - obj2, err := dec.ToObject([][]byte{obj}) - require.NoError(t, err) - - err = head.Append(id, obj2, 0, 0) - require.NoError(t, err, "unexpected error writing req") - } - - firstBlock, err := w.CompleteBlock(context.Background(), head) - require.NoError(t, err) - - // choose a random id to drop - dropID := allIDs[rand.Intn(len(allIDs))] - - rw := r.(*readerWriter) - // force compact to a new block - opts := common.CompactionOptions{ - BlockConfig: *rw.cfg.Block, - ChunkSizeBytes: DefaultChunkSizeBytes, - FlushSizeBytes: DefaultFlushSizeBytes, - IteratorBufferSize: DefaultIteratorBufferSize, - OutputBlocks: 1, - Combiner: model.StaticCombiner, - MaxBytesPerTrace: 0, - - Continue: func() bool { return false }, // ask compaction to stop - - // hook to drop the trace - DropObject: func(id common.ID) bool { - return bytes.Equal(id, dropID) - }, - - // setting to prevent panics. - BytesWritten: func(_, _ int) {}, - ObjectsCombined: func(_, _ int) {}, - ObjectsWritten: func(_, _ int) {}, - SpansDiscarded: func(_, _, _ string, _ int) {}, - DisconnectedTrace: func() {}, - RootlessTrace: func() {}, - } - - enc, err := encoding.FromVersion(targetBlockVersion) - require.NoError(t, err) - - compactor := enc.NewCompactor(opts) - _, err = compactor.Compact(context.Background(), log.NewNopLogger(), rw.r, rw.w, []*backend.BlockMeta{firstBlock.BlockMeta()}) - require.Error(t, err) - require.ErrorIs(t, err, backend.ErrCompactionAbandoned) -} - type testData struct { id common.ID t *tempopb.Trace @@ -1003,6 +894,6 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) { b.ResetTimer() - err = rw.compact(ctx, metas, testTenantID, func() bool { return true }) + err = rw.compact(ctx, metas, testTenantID) require.NoError(b, err) } diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index d1bae68022f..ef3cd6b9969 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -78,10 +78,6 @@ type CompactionOptions struct { BlockConfig BlockConfig Combiner model.ObjectCombiner - // Continue is a function that can be used to check if the compaction should continue. This should - // be checked regularly during the compaction process. - Continue func() bool - // DropObject can be used to drop a trace from the compaction process. Currently it only receives the ID // of the trace to be compacted. If the function returns true, the trace will be dropped. DropObject func(ID) bool diff --git a/tempodb/encoding/v2/compactor.go b/tempodb/encoding/v2/compactor.go index 6eb196caa33..f9731dd4db2 100644 --- a/tempodb/encoding/v2/compactor.go +++ b/tempodb/encoding/v2/compactor.go @@ -139,10 +139,6 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock) (backend.AppendTracker, error) { compactionLevel := int(block.BlockMeta().CompactionLevel - 1) - if !c.opts.Continue() { - return nil, backend.ErrCompactionAbandoned - } - if c.opts.ObjectsWritten != nil { c.opts.ObjectsWritten(compactionLevel, block.CurrentBufferedObjects()) } @@ -162,10 +158,6 @@ func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker b func (c *Compactor) finishBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock, l log.Logger) error { level.Info(l).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta())) - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - bytesFlushed, err := block.Complete(ctx, tracker, w) if err != nil { return err diff --git a/tempodb/encoding/vparquet2/compactor.go b/tempodb/encoding/vparquet2/compactor.go index e3c8e587f73..ccf1e2f87f8 100644 --- a/tempodb/encoding/vparquet2/compactor.go +++ b/tempodb/encoding/vparquet2/compactor.go @@ -218,10 +218,6 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") defer span.End() - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - var ( objs = block.CurrentBufferedObjects() vals = block.EstimatedBufferedBytes() @@ -250,10 +246,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") defer span.End() - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - bytesFlushed, err := block.Complete() if err != nil { return fmt.Errorf("error completing block: %w", err) diff --git a/tempodb/encoding/vparquet3/compactor.go b/tempodb/encoding/vparquet3/compactor.go index cf759965d94..7d66cadcf43 100644 --- a/tempodb/encoding/vparquet3/compactor.go +++ b/tempodb/encoding/vparquet3/compactor.go @@ -226,10 +226,6 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") defer span.End() - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - var ( objs = block.CurrentBufferedObjects() vals = block.EstimatedBufferedBytes() @@ -258,10 +254,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") defer span.End() - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - bytesFlushed, err := block.Complete() if err != nil { return fmt.Errorf("error completing block: %w", err) diff --git a/tempodb/encoding/vparquet4/compactor.go b/tempodb/encoding/vparquet4/compactor.go index 61552a95143..3b7d258eb19 100644 --- a/tempodb/encoding/vparquet4/compactor.go +++ b/tempodb/encoding/vparquet4/compactor.go @@ -230,10 +230,6 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") defer span.End() - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - var ( objs = block.CurrentBufferedObjects() vals = block.EstimatedBufferedBytes() @@ -262,10 +258,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") defer span.End() - if !c.opts.Continue() { - return backend.ErrCompactionAbandoned - } - bytesFlushed, err := block.Complete() if err != nil { return fmt.Errorf("error completing block: %w", err) diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 2b110b7c329..d47374f9047 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -547,7 +547,7 @@ func TestSearchCompactedBlocks(t *testing.T) { // compact var blockMetas []*backend.BlockMeta blockMetas = append(blockMetas, complete.BlockMeta()) - require.NoError(t, rw.compact(ctx, blockMetas, testTenantID, func() bool { return true })) + require.NoError(t, rw.compact(ctx, blockMetas, testTenantID)) // poll rw.pollBlocklist()