diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index ba3a3faaf6f..b000f91d930 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -780,6 +780,113 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) { } } +func TestCompactionHaltsAndReturnsError(t *testing.T) { + for _, enc := range encoding.AllEncodings() { + version := enc.Version() + t.Run(version, func(t *testing.T) { + testCompactionHaltsAndReturnsError(t, version) + }) + } +} + +func testCompactionHaltsAndReturnsError(t *testing.T, targetBlockVersion string) { + tempDir := t.TempDir() + + r, w, _, err := New(&Config{ + Backend: backend.Local, + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &common.BlockConfig{ + IndexDownsampleBytes: 11, + BloomFP: .01, + BloomShardSizeBytes: 100_000, + Version: targetBlockVersion, + Encoding: backend.EncSnappy, + IndexPageSizeBytes: 1000, + RowGroupSizeBytes: 30_000_000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + }, nil, log.NewNopLogger()) + require.NoError(t, err) + + wal := w.WAL() + require.NoError(t, err) + + dec := model.MustNewSegmentDecoder(v1.Encoding) + + recordCount := 100 + allIDs := make([]common.ID, 0, recordCount) + + // write a bunch of dummy data + blockID := backend.NewUUID() + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: v1.Encoding} + head, err := wal.NewBlock(meta, v1.Encoding) + require.NoError(t, err) + + for j := 0; j < recordCount; j++ { + id := test.ValidTraceID(nil) + allIDs = append(allIDs, id) + + obj, err := dec.PrepareForWrite(test.MakeTrace(1, id), 0, 0) + require.NoError(t, err) + + obj2, err := dec.ToObject([][]byte{obj}) + require.NoError(t, err) + + err = head.Append(id, obj2, 0, 0) + require.NoError(t, err, "unexpected error writing req") + } + + firstBlock, err := w.CompleteBlock(context.Background(), head) + require.NoError(t, err) + + // choose a random id to drop + dropID := allIDs[rand.Intn(len(allIDs))] + + rw := r.(*readerWriter) + // force compact to a new block + opts := common.CompactionOptions{ + BlockConfig: *rw.cfg.Block, + ChunkSizeBytes: DefaultChunkSizeBytes, + FlushSizeBytes: DefaultFlushSizeBytes, + IteratorBufferSize: DefaultIteratorBufferSize, + OutputBlocks: 1, + Combiner: model.StaticCombiner, + MaxBytesPerTrace: 0, + + Continue: func() bool { return false }, // ask compaction to stop + + // hook to drop the trace + DropObject: func(id common.ID) bool { + return bytes.Equal(id, dropID) + }, + + // setting to prevent panics. + BytesWritten: func(_, _ int) {}, + ObjectsCombined: func(_, _ int) {}, + ObjectsWritten: func(_, _ int) {}, + SpansDiscarded: func(_, _, _ string, _ int) {}, + DisconnectedTrace: func() {}, + RootlessTrace: func() {}, + } + + enc, err := encoding.FromVersion(targetBlockVersion) + require.NoError(t, err) + + compactor := enc.NewCompactor(opts) + _, err = compactor.Compact(context.Background(), log.NewNopLogger(), rw.r, rw.w, []*backend.BlockMeta{firstBlock.BlockMeta()}) + require.Error(t, err) + require.ErrorIs(t, err, backend.ErrCompactionAbandoned) +} + type testData struct { id common.ID t *tempopb.Trace diff --git a/tempodb/encoding/v2/compactor.go b/tempodb/encoding/v2/compactor.go index f9731dd4db2..6eb196caa33 100644 --- a/tempodb/encoding/v2/compactor.go +++ b/tempodb/encoding/v2/compactor.go @@ -139,6 +139,10 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock) (backend.AppendTracker, error) { compactionLevel := int(block.BlockMeta().CompactionLevel - 1) + if !c.opts.Continue() { + return nil, backend.ErrCompactionAbandoned + } + if c.opts.ObjectsWritten != nil { c.opts.ObjectsWritten(compactionLevel, block.CurrentBufferedObjects()) } @@ -158,6 +162,10 @@ func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker b func (c *Compactor) finishBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock, l log.Logger) error { level.Info(l).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta())) + if !c.opts.Continue() { + return backend.ErrCompactionAbandoned + } + bytesFlushed, err := block.Complete(ctx, tracker, w) if err != nil { return err diff --git a/tempodb/encoding/vparquet2/compactor.go b/tempodb/encoding/vparquet2/compactor.go index ccf1e2f87f8..e3c8e587f73 100644 --- a/tempodb/encoding/vparquet2/compactor.go +++ b/tempodb/encoding/vparquet2/compactor.go @@ -218,6 +218,10 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") defer span.End() + if !c.opts.Continue() { + return backend.ErrCompactionAbandoned + } + var ( objs = block.CurrentBufferedObjects() vals = block.EstimatedBufferedBytes() @@ -246,6 +250,10 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") defer span.End() + if !c.opts.Continue() { + return backend.ErrCompactionAbandoned + } + bytesFlushed, err := block.Complete() if err != nil { return fmt.Errorf("error completing block: %w", err) diff --git a/tempodb/encoding/vparquet3/compactor.go b/tempodb/encoding/vparquet3/compactor.go index 7d66cadcf43..cf759965d94 100644 --- a/tempodb/encoding/vparquet3/compactor.go +++ b/tempodb/encoding/vparquet3/compactor.go @@ -226,6 +226,10 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") defer span.End() + if !c.opts.Continue() { + return backend.ErrCompactionAbandoned + } + var ( objs = block.CurrentBufferedObjects() vals = block.EstimatedBufferedBytes() @@ -254,6 +258,10 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") defer span.End() + if !c.opts.Continue() { + return backend.ErrCompactionAbandoned + } + bytesFlushed, err := block.Complete() if err != nil { return fmt.Errorf("error completing block: %w", err)