Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Add RPC service for bloom-planner #13015

Merged
merged 8 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use protos
  • Loading branch information
salvacorts committed May 22, 2024
commit 1e2e9bdf7bef9523ec06cfece777678e8aa5fc86
47 changes: 21 additions & 26 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package planner
import (
"context"
"fmt"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/pkg/errors"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -179,8 +179,11 @@ func (p *Planner) runOne(ctx context.Context) error {
for _, gap := range gaps {
totalTasks++

task := NewTask(w.table.Addr(), w.tenant, w.ownershipRange, gap.tsdb, gap.gaps)
task = task.WithQueueTracking(ctx, now)
task := NewQueueTask(
ctx,
now,
protos.NewTask(w.table.Addr(), w.tenant, w.ownershipRange, gap.tsdb, gap.gaps),
)

if err := p.enqueueTask(task); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
Expand Down Expand Up @@ -337,7 +340,7 @@ func (p *Planner) findGapsForBounds(
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []GapWithBlocks
gaps []protos.GapWithBlocks
}

func (p *Planner) findOutdatedGaps(
Expand Down Expand Up @@ -431,12 +434,12 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]GapWithBlocks, 0, len(idx.gaps)),
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := GapWithBlocks{
bounds: gap,
planGap := protos.GapWithBlocks{
Bounds: gap,
}

for _, meta := range metas {
Expand All @@ -453,18 +456,18 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.blocks = append(planGap.blocks, block)
planGap.Blocks = append(planGap.Blocks, block)
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.blocks, func(i, j int) bool {
return planGap.blocks[i].Bounds.Less(planGap.blocks[j].Bounds)
sort.Slice(planGap.Blocks, func(i, j int) bool {
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})

peekingBlocks := v1.NewPeekingIter[bloomshipper.BlockRef](
v1.NewSliceIter[bloomshipper.BlockRef](
planGap.blocks,
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
Expand All @@ -483,7 +486,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
planGap.blocks = deduped
planGap.Blocks = deduped

plan.gaps = append(plan.gaps, planGap)
}
Expand All @@ -497,18 +500,18 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan
func (p *Planner) addPendingTask(task *Task) {
p.pendingTasksMu.Lock()
defer p.pendingTasksMu.Unlock()
p.pendingTasks[task.ID()] = task
p.pendingTasks[task.ID] = task
}

func (p *Planner) removePendingTask(task *Task) {
p.pendingTasksMu.Lock()
defer p.pendingTasksMu.Unlock()
delete(p.pendingTasks, task.ID())
delete(p.pendingTasks, task.ID)
}

func (p *Planner) enqueueTask(task *Task) error {
p.activeUsers.UpdateUserTimestamp(task.tenant, time.Now())
return p.tasksQueue.Enqueue(task.tenant, nil, task, func() {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
p.addPendingTask(task)
})
}
Expand Down Expand Up @@ -582,16 +585,8 @@ func (p *Planner) forwardTaskToBuilder(
) error {
defer p.removePendingTask(task)

// TODO: Complete Task proto definition
msg := &protos.PlannerToBuilder{
Task: &protos.Task{
Table: task.table,
Tenant: task.tenant,
Bounds: protos.FPBounds{
Min: task.OwnershipBounds.Min,
Max: task.OwnershipBounds.Max,
},
},
Task: task.ToProtoTask(),
}

if err := builder.Send(msg); err != nil {
Expand Down
43 changes: 6 additions & 37 deletions pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,22 @@ package planner

import (
"context"
"github.com/google/uuid"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"time"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

// TODO: Extract this definiton to a proto file at pkg/bloombuild/protos/protos.proto

type GapWithBlocks struct {
bounds v1.FingerprintBounds
blocks []bloomshipper.BlockRef
}

type Task struct {
id string

table string
tenant string
OwnershipBounds v1.FingerprintBounds
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []GapWithBlocks
*protos.Task

// Tracking
queueTime time.Time
ctx context.Context
}

func NewTask(table, tenant string, bounds v1.FingerprintBounds, tsdb tsdb.SingleTenantTSDBIdentifier, gaps []GapWithBlocks) *Task {
func NewQueueTask(ctx context.Context, queueTime time.Time, task *protos.Task) *Task {
return &Task{
id: uuid.NewString(),

table: table,
tenant: tenant,
OwnershipBounds: bounds,
tsdb: tsdb,
gaps: gaps,
Task: task,
ctx: ctx,
queueTime: queueTime,
}
}

func (t *Task) ID() string {
return t.id
}

func (t *Task) WithQueueTracking(ctx context.Context, queueTime time.Time) *Task {
t.queueTime = queueTime
t.ctx = ctx
return t
}
109 changes: 109 additions & 0 deletions pkg/bloombuild/protos/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package protos

import (
"fmt"

"github.com/google/uuid"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

type GapWithBlocks struct {
Bounds v1.FingerprintBounds
Blocks []bloomshipper.BlockRef
}

type Task struct {
ID string

Table string
Tenant string
OwnershipBounds v1.FingerprintBounds
TSDB tsdb.SingleTenantTSDBIdentifier
Gaps []GapWithBlocks
}

func NewTask(table, tenant string, bounds v1.FingerprintBounds, tsdb tsdb.SingleTenantTSDBIdentifier, gaps []GapWithBlocks) *Task {
return &Task{
ID: uuid.NewString(),

Table: table,
Tenant: tenant,
OwnershipBounds: bounds,
TSDB: tsdb,
Gaps: gaps,
}
}

// TODO: Use it in the builder to parse the task
func FromProtoTask(task ProtoTask) (*Task, error) {
tsdbRef, ok := tsdb.ParseSingleTenantTSDBPath(task.Tsdb)
if !ok {
return nil, fmt.Errorf("failed to parse tsdb path %s", task.Tsdb)
}

gaps := make([]GapWithBlocks, 0, len(task.Gaps))
for _, gap := range task.Gaps {
bounds := v1.FingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
}
blocks := make([]bloomshipper.BlockRef, 0, len(gap.BlockRef))
for _, block := range gap.BlockRef {
b, err := bloomshipper.BlockRefFromKey(block)
if err != nil {
return nil, fmt.Errorf("failed to parse block ref %s: %w", block, err)
}

blocks = append(blocks, b)
}
gaps = append(gaps, GapWithBlocks{
Bounds: bounds,
Blocks: blocks,
})
}

return &Task{
ID: task.Id,
Table: task.Table,
Tenant: task.Tenant,
OwnershipBounds: v1.FingerprintBounds{
Min: task.Bounds.Min,
Max: task.Bounds.Max,
},
TSDB: tsdbRef,
Gaps: gaps,
}, nil
}

func (t *Task) ToProtoTask() *ProtoTask {
protoGaps := make([]*ProtoGapWithBlocks, 0, len(t.Gaps))
for _, gap := range t.Gaps {
blockRefs := make([]string, 0, len(gap.Blocks))
for _, block := range gap.Blocks {
blockRefs = append(blockRefs, block.String())
}

protoGaps = append(protoGaps, &ProtoGapWithBlocks{
Bounds: ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
},
BlockRef: blockRefs,
})
}

return &ProtoTask{
Id: t.ID,
Table: t.Table,
Tenant: t.Tenant,
Bounds: ProtoFingerprintBounds{
Min: t.OwnershipBounds.Min,
Max: t.OwnershipBounds.Max,
},
Tsdb: t.TSDB.Path(),
Gaps: protoGaps,
}
}
40 changes: 20 additions & 20 deletions pkg/bloombuild/protos/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/bloombuild/protos/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ message BuilderToPlanner {
}

message PlannerToBuilder {
Task task = 1;
ProtoTask task = 1;
}

message NotifyBuilderShutdownRequest {
Expand Down
Loading
Loading