Skip to content

Commit

Permalink
add abandon code to all encodings
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Dec 5, 2024
1 parent 7d2de68 commit 0b3d145
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 0 deletions.
107 changes: 107 additions & 0 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,113 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) {
}
}

func TestCompactionHaltsAndReturnsError(t *testing.T) {
for _, enc := range encoding.AllEncodings() {
version := enc.Version()
t.Run(version, func(t *testing.T) {
testCompactionHaltsAndReturnsError(t, version)
})
}
}

func testCompactionHaltsAndReturnsError(t *testing.T, targetBlockVersion string) {
tempDir := t.TempDir()

r, w, _, err := New(&Config{
Backend: backend.Local,
Pool: &pool.Config{
MaxWorkers: 10,
QueueDepth: 100,
},
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
Block: &common.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Version: targetBlockVersion,
Encoding: backend.EncSnappy,
IndexPageSizeBytes: 1000,
RowGroupSizeBytes: 30_000_000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, nil, log.NewNopLogger())
require.NoError(t, err)

wal := w.WAL()
require.NoError(t, err)

dec := model.MustNewSegmentDecoder(v1.Encoding)

recordCount := 100
allIDs := make([]common.ID, 0, recordCount)

// write a bunch of dummy data
blockID := backend.NewUUID()
meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: v1.Encoding}
head, err := wal.NewBlock(meta, v1.Encoding)
require.NoError(t, err)

for j := 0; j < recordCount; j++ {
id := test.ValidTraceID(nil)
allIDs = append(allIDs, id)

obj, err := dec.PrepareForWrite(test.MakeTrace(1, id), 0, 0)
require.NoError(t, err)

obj2, err := dec.ToObject([][]byte{obj})
require.NoError(t, err)

err = head.Append(id, obj2, 0, 0)
require.NoError(t, err, "unexpected error writing req")
}

firstBlock, err := w.CompleteBlock(context.Background(), head)
require.NoError(t, err)

// choose a random id to drop
dropID := allIDs[rand.Intn(len(allIDs))]

rw := r.(*readerWriter)
// force compact to a new block
opts := common.CompactionOptions{
BlockConfig: *rw.cfg.Block,
ChunkSizeBytes: DefaultChunkSizeBytes,
FlushSizeBytes: DefaultFlushSizeBytes,
IteratorBufferSize: DefaultIteratorBufferSize,
OutputBlocks: 1,
Combiner: model.StaticCombiner,
MaxBytesPerTrace: 0,

Continue: func() bool { return false }, // ask compaction to stop

// hook to drop the trace
DropObject: func(id common.ID) bool {
return bytes.Equal(id, dropID)
},

// setting to prevent panics.
BytesWritten: func(_, _ int) {},
ObjectsCombined: func(_, _ int) {},
ObjectsWritten: func(_, _ int) {},
SpansDiscarded: func(_, _, _ string, _ int) {},
DisconnectedTrace: func() {},
RootlessTrace: func() {},
}

enc, err := encoding.FromVersion(targetBlockVersion)
require.NoError(t, err)

compactor := enc.NewCompactor(opts)
_, err = compactor.Compact(context.Background(), log.NewNopLogger(), rw.r, rw.w, []*backend.BlockMeta{firstBlock.BlockMeta()})
require.Error(t, err)
require.ErrorIs(t, err, backend.ErrCompactionAbandoned)
}

type testData struct {
id common.ID
t *tempopb.Trace
Expand Down
8 changes: 8 additions & 0 deletions tempodb/encoding/v2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock) (backend.AppendTracker, error) {
compactionLevel := int(block.BlockMeta().CompactionLevel - 1)

if !c.opts.Continue() {
return nil, backend.ErrCompactionAbandoned
}

if c.opts.ObjectsWritten != nil {
c.opts.ObjectsWritten(compactionLevel, block.CurrentBufferedObjects())
}
Expand All @@ -158,6 +162,10 @@ func (c *Compactor) appendBlock(ctx context.Context, w backend.Writer, tracker b
func (c *Compactor) finishBlock(ctx context.Context, w backend.Writer, tracker backend.AppendTracker, block *StreamingBlock, l log.Logger) error {
level.Info(l).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta()))

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete(ctx, tracker, w)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions tempodb/encoding/vparquet2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.appendBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

var (
objs = block.CurrentBufferedObjects()
vals = block.EstimatedBufferedBytes()
Expand Down Expand Up @@ -246,6 +250,10 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.finishBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete()
if err != nil {
return fmt.Errorf("error completing block: %w", err)
Expand Down
8 changes: 8 additions & 0 deletions tempodb/encoding/vparquet3/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.appendBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

var (
objs = block.CurrentBufferedObjects()
vals = block.EstimatedBufferedBytes()
Expand Down Expand Up @@ -254,6 +258,10 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo
_, span := tracer.Start(ctx, "vparquet.compactor.finishBlock")
defer span.End()

if !c.opts.Continue() {
return backend.ErrCompactionAbandoned
}

bytesFlushed, err := block.Complete()
if err != nil {
return fmt.Errorf("error completing block: %w", err)
Expand Down

0 comments on commit 0b3d145

Please sign in to comment.