Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Add RPC service for bloom-planner #13015

Merged
merged 8 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement service
  • Loading branch information
salvacorts committed May 22, 2024
commit d157ae8259607d07fef22043ac786b081bc4cff4
7 changes: 7 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Metrics struct {
connectedBuilders prometheus.GaugeFunc
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
taskLost prometheus.Counter

buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
Expand Down Expand Up @@ -65,6 +66,12 @@ func NewMetrics(
MaxAge: time.Minute,
AgeBuckets: 6,
}),
taskLost: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "task_lost_total",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be named tasks_lost_total?

Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),

buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down
156 changes: 141 additions & 15 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package planner
import (
"context"
"fmt"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/pkg/errors"
"sort"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -22,6 +25,8 @@ import (
utillog "github.com/grafana/loki/v3/pkg/util/log"
)

var errPlannerIsNotRunning = errors.New("planner is not running")

type Planner struct {
services.Service
// Subservices manager.
Expand All @@ -38,6 +43,9 @@ type Planner struct {
tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

pendingTasksMu sync.Mutex
pendingTasks map[string]*Task

metrics *Metrics
logger log.Logger
}
Expand Down Expand Up @@ -108,20 +116,31 @@ func (p *Planner) running(ctx context.Context) error {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
}

ticker := time.NewTicker(p.cfg.PlanningInterval)
defer ticker.Stop()
planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()

inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()

for {
select {
case <-ctx.Done():
err := ctx.Err()
level.Debug(p.logger).Log("msg", "planner context done", "err", err)
return err

case <-ticker.C:
case <-planningTicker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err)
}

case <-inflightTasksTicker.C:
p.pendingTasksMu.Lock()
inflight := len(p.pendingTasks)
p.pendingTasksMu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since we use this pendingTasks every time we use the queue, would it make sense to implement a reusable sync map for pending tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done


p.metrics.inflightRequests.Observe(float64(inflight))
}
}
}
Expand Down Expand Up @@ -159,19 +178,11 @@ func (p *Planner) runOne(ctx context.Context) error {
now := time.Now()
for _, gap := range gaps {
totalTasks++
task := Task{
table: w.table.Addr(),
tenant: w.tenant,
OwnershipBounds: w.ownershipRange,
tsdb: gap.tsdb,
gaps: gap.gaps,

queueTime: now,
ctx: ctx,
}

p.activeUsers.UpdateUserTimestamp(task.tenant, now)
if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil {
task := NewTask(w.table.Addr(), w.tenant, w.ownershipRange, gap.tsdb, gap.gaps)
task = task.WithQueueTracking(ctx, now)

if err := p.enqueueTask(task); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
}
Expand Down Expand Up @@ -482,3 +493,118 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan

return plans, nil
}

func (p *Planner) addPendingTask(task *Task) {
p.pendingTasksMu.Lock()
defer p.pendingTasksMu.Unlock()
p.pendingTasks[task.ID()] = task
}

func (p *Planner) removePendingTask(task *Task) {
p.pendingTasksMu.Lock()
defer p.pendingTasksMu.Unlock()
delete(p.pendingTasks, task.ID())
}

func (p *Planner) enqueueTask(task *Task) error {
p.activeUsers.UpdateUserTimestamp(task.tenant, time.Now())
return p.tasksQueue.Enqueue(task.tenant, nil, task, func() {
p.addPendingTask(task)
})
}

func (p *Planner) NotifyBuilderShutdown(
_ context.Context,
req *protos.NotifyBuilderShutdownRequest,
) (*protos.NotifyBuilderShutdownResponse, error) {
level.Debug(p.logger).Log("msg", "builder shutdown", "builder", req.BuilderID)
p.tasksQueue.UnregisterConsumerConnection(req.GetBuilderID())

return &protos.NotifyBuilderShutdownResponse{}, nil
}

func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer) error {
resp, err := builder.Recv()
if err != nil {
return fmt.Errorf("error receiving message from builder: %w", err)
}

builderID := resp.GetBuilderID()
logger := log.With(p.logger, "builder", builderID)
level.Debug(logger).Log("msg", "builder connected")

p.tasksQueue.RegisterConsumerConnection(builderID)
defer p.tasksQueue.UnregisterConsumerConnection(builderID)

lastIndex := queue.StartIndex
for p.isRunningOrStopping() {
item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
if err != nil {
return fmt.Errorf("error dequeuing task: %w", err)
}
lastIndex = idx

// This really should not happen, but log additional information before the scheduler panics.
if item == nil {
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}
task := item.(*Task)

queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())

if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.removePendingTask(task)
continue
}

if err := p.forwardTaskToBuilder(builder, builderID, task); err != nil {
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
}

return fmt.Errorf("error forwarding task to builder (%s). Task requeued: %w", builderID, err)
}

}

return errPlannerIsNotRunning
}

func (p *Planner) forwardTaskToBuilder(
builder protos.PlannerForBuilder_BuilderLoopServer,
builderID string,
task *Task,
) error {
defer p.removePendingTask(task)

// TODO: Complete Task proto definition
msg := &protos.PlannerToBuilder{
Task: &protos.Task{
Table: task.table,
Tenant: task.tenant,
Bounds: protos.FPBounds{
Min: task.OwnershipBounds.Min,
Max: task.OwnershipBounds.Max,
},
},
}

if err := builder.Send(msg); err != nil {
return fmt.Errorf("error sending task to builder (%s): %w", builderID, err)
}

// TODO(salvacorts): Implement timeout and retry for builder response.
_, err := builder.Recv()

return err
}

func (p *Planner) isRunningOrStopping() bool {
st := p.State()
return st == services.Running || st == services.Stopping
}
25 changes: 25 additions & 0 deletions pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package planner

import (
"context"
"github.com/google/uuid"
"time"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand All @@ -17,6 +18,8 @@ type GapWithBlocks struct {
}

type Task struct {
id string

table string
tenant string
OwnershipBounds v1.FingerprintBounds
Expand All @@ -27,3 +30,25 @@ type Task struct {
queueTime time.Time
ctx context.Context
}

func NewTask(table, tenant string, bounds v1.FingerprintBounds, tsdb tsdb.SingleTenantTSDBIdentifier, gaps []GapWithBlocks) *Task {
return &Task{
id: uuid.NewString(),

table: table,
tenant: tenant,
OwnershipBounds: bounds,
tsdb: tsdb,
gaps: gaps,
}
}

func (t *Task) ID() string {
return t.id
}

func (t *Task) WithQueueTracking(ctx context.Context, queueTime time.Time) *Task {
t.queueTime = queueTime
t.ctx = ctx
return t
}