Skip to content

Commit

Permalink
Ship chunkrefs in task payload
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jul 26, 2024
1 parent 8a3ae22 commit d07adce
Show file tree
Hide file tree
Showing 6 changed files with 539 additions and 72 deletions.
12 changes: 3 additions & 9 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (b *Builder) processTask(
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation.
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap)
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return nil, fmt.Errorf("failed to get series and blocks: %w", err)
Expand Down Expand Up @@ -454,15 +454,9 @@ func (b *Builder) processTask(
func (b *Builder) loadWorkForGap(
ctx context.Context,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))

// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
Expand Down
27 changes: 21 additions & 6 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.GapWithBlocks
gaps []protos.Gap
}

func (p *Planner) findOutdatedGaps(
Expand Down Expand Up @@ -690,7 +690,7 @@ func (p *Planner) findOutdatedGaps(
return nil, nil
}

work, err := blockPlansForGaps(tsdbsWithGaps, metas)
work, err := blockPlansForGaps(ctx, tenant, table, p.tsdbStore, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
Expand Down Expand Up @@ -743,22 +743,37 @@ func gapsBetweenTSDBsAndMetas(
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
func blockPlansForGaps(
ctx context.Context,
tenant string,
table config.DayTable,
store common.TSDBStore,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := protos.GapWithBlocks{
planGap := protos.Gap{
Bounds: gap,
}

for _, meta := range metas {
seriesItr, err := store.LoadTSDB(ctx, table, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}

for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
Expand Down
74 changes: 66 additions & 8 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
Expand Down Expand Up @@ -220,9 +221,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
},
},
},
Expand All @@ -238,9 +240,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)},
},
},
Expand All @@ -261,9 +264,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
},
},
},
Expand All @@ -280,9 +284,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)},
},
},
Expand All @@ -306,31 +311,35 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
{
Bounds: v1.NewBounds(3, 5),
Series: genSeries(v1.NewBounds(3, 5)),
Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)},
},
{
Bounds: v1.NewBounds(9, 10),
Series: genSeries(v1.NewBounds(9, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)},
},
},
},
// tsdb (id=1) can source chunks from the blocks built from tsdb (id=0)
{
tsdb: tsdbID(1),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 2),
Series: genSeries(v1.NewBounds(0, 2)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
},
},
{
Bounds: v1.NewBounds(6, 7),
Series: genSeries(v1.NewBounds(6, 7)),
Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)},
},
},
Expand All @@ -354,9 +363,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(5, 10),
Expand All @@ -369,20 +379,68 @@ func Test_blockPlansForGaps(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
// We add series spanning the whole FP ownership range
tsdbStore := newFakeTsdbStore(genSeries(tc.ownershipRange))

// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
// separately and it's used to generate input in our regular code path (easier to write tests this way).
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas)
require.NoError(t, err)

plans, err := blockPlansForGaps(gaps, tc.metas)
plans, err := blockPlansForGaps(
context.Background(),
"fakeTenant",
config.NewDayTable(testDay, "fake"),
tsdbStore,
gaps,
tc.metas,
)
if tc.err {
require.Error(t, err)
return
}
require.Equal(t, tc.exp, plans)
})
}
}

func genSeries(bounds v1.FingerprintBounds) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
for i := bounds.Min; i <= bounds.Max; i++ {
series = append(series, &v1.Series{
Fingerprint: i,
Chunks: v1.ChunkRefs{
{
From: 0,
Through: 1,
Checksum: 1,
},
},
})
}
return series
}

type fakeTsdbStore struct {
common.TSDBStore

series []*v1.Series
}

func newFakeTsdbStore(series []*v1.Series) *fakeTsdbStore {
return &fakeTsdbStore{
series: series,
}
}

func (f *fakeTsdbStore) LoadTSDB(_ context.Context, _ config.DayTable, _ string, _ tsdb.Identifier, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if bounds.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}
return iter.NewSliceIter(overlapping), nil
}

func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
Expand Down
41 changes: 35 additions & 6 deletions pkg/bloombuild/protos/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

type GapWithBlocks struct {
type Gap struct {
Bounds v1.FingerprintBounds
Series []*v1.Series
Blocks []bloomshipper.BlockRef
}

Expand All @@ -25,18 +27,18 @@ type Task struct {
Tenant string
OwnershipBounds v1.FingerprintBounds
TSDB tsdb.SingleTenantTSDBIdentifier
Gaps []GapWithBlocks
Gaps []Gap
}

func NewTask(
table config.DayTable,
tenant string,
bounds v1.FingerprintBounds,
tsdb tsdb.SingleTenantTSDBIdentifier,
gaps []GapWithBlocks,
gaps []Gap,
) *Task {
return &Task{
ID: fmt.Sprintf("%s-%s-%s-%d-%d", table.Addr(), tenant, bounds.String(), tsdb.Checksum, len(gaps)),
ID: fmt.Sprintf("%s-%s-%s-%d", table.Addr(), tenant, bounds.String(), len(gaps)),

Table: table,
Tenant: tenant,
Expand All @@ -56,12 +58,25 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
return nil, fmt.Errorf("failed to parse tsdb path %s", task.Tsdb)
}

gaps := make([]GapWithBlocks, 0, len(task.Gaps))
gaps := make([]Gap, 0, len(task.Gaps))
for _, gap := range task.Gaps {
bounds := v1.FingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
}

series := make([]v1.Series, 0, len(gap.Series))
for _, s := range gap.Series {
chunks := make(v1.ChunkRefs, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, v1.ChunkRef(*c))
}
series = append(series, v1.Series{

Check failure on line 74 in pkg/bloombuild/protos/compat.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

SA4010: this result of append is never used, except maybe in other appends (staticcheck)
Fingerprint: model.Fingerprint(s.Fingerprint),
Chunks: chunks,
})
}

blocks := make([]bloomshipper.BlockRef, 0, len(gap.BlockRef))
for _, block := range gap.BlockRef {
b, err := bloomshipper.BlockRefFromKey(block)
Expand All @@ -71,7 +86,7 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {

blocks = append(blocks, b)
}
gaps = append(gaps, GapWithBlocks{
gaps = append(gaps, Gap{
Bounds: bounds,
Blocks: blocks,
})
Expand Down Expand Up @@ -102,6 +117,20 @@ func (t *Task) ToProtoTask() *ProtoTask {
blockRefs = append(blockRefs, block.String())
}

series := make([]*ProtoSeries, 0, len(gap.Series))
for _, s := range gap.Series {
chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunk := logproto.ShortRef(c)
chunks = append(chunks, &chunk)
}

series = append(series, &ProtoSeries{

Check failure on line 128 in pkg/bloombuild/protos/compat.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

SA4010: this result of append is never used, except maybe in other appends (staticcheck)
Fingerprint: uint64(s.Fingerprint),
Chunks: chunks,
})
}

protoGaps = append(protoGaps, &ProtoGapWithBlocks{
Bounds: ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Expand Down
Loading

0 comments on commit d07adce

Please sign in to comment.