Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Builder retrieves tasks from planner #13046

Merged
merged 12 commits into from
May 29, 2024
Prev Previous commit
Next Next commit
Add test and fixes
  • Loading branch information
salvacorts committed May 28, 2024
commit 403c730c4e65723fb4029d4908e4c7f8baa7fb54
55 changes: 28 additions & 27 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,7 @@ func New(
return w, nil
}

func (w *Builder) starting(ctx context.Context) error {
opts, err := w.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

conn, err := grpc.DialContext(ctx, w.cfg.PlannerAddress, opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}

w.client = protos.NewPlannerForBuilderClient(conn)
func (w *Builder) starting(_ context.Context) error {
w.metrics.running.Set(1)
return nil
}
Expand All @@ -78,15 +67,22 @@ func (w *Builder) stopping(_ error) error {
}

func (w *Builder) running(ctx context.Context) error {
opts, err := w.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

c, err := w.client.BuilderLoop(ctx)
// TODO: Wrap hereafter in retry logic
conn, err := grpc.DialContext(ctx, w.cfg.PlannerAddress, opts...)
if err != nil {
return fmt.Errorf("failed to start builder loop: %w", err)
return fmt.Errorf("failed to dial bloom planner: %w", err)
}

// Send ready message to planner
if err := c.Send(&protos.BuilderToPlanner{BuilderID: w.ID}); err != nil {
return fmt.Errorf("failed to send ready message to planner: %w", err)
w.client = protos.NewPlannerForBuilderClient(conn)

c, err := w.client.BuilderLoop(ctx)
if err != nil {
return fmt.Errorf("failed to start builder loop: %w", err)
}

// Start processing tasks from planner
Expand All @@ -98,19 +94,21 @@ func (w *Builder) running(ctx context.Context) error {
}

func (w *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
for {
ctx := c.Context()
if err := ctx.Err(); err != nil {
if !errors.Is(err, context.Canceled) {
return fmt.Errorf("builder loop context error: %w", err)
}

level.Debug(w.logger).Log("msg", "builder loop context canceled")
return nil
}
// Send ready message to planner
if err := c.Send(&protos.BuilderToPlanner{BuilderID: w.ID}); err != nil {
return fmt.Errorf("failed to send ready message to planner: %w", err)
}

for w.State() == services.Running {
// When the planner connection closes or the builder stops, the context
// will be canceled and the loop will exit.
protoTask, err := c.Recv()
if err != nil {
if errors.Is(c.Context().Err(), context.Canceled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should check err for context.Canceled, or, check if c.Context() == nil before c.Recv()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection context is canceled, c.Recv() will return right after calling it.
(I actually had it as you suggest but noticed the behaviour while testing and decided to remove it for simplicity)

level.Debug(w.logger).Log("msg", "builder loop context canceled")
return nil
}

return fmt.Errorf("failed to receive task from planner: %w", err)
}

Expand All @@ -132,6 +130,9 @@ func (w *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
return fmt.Errorf("failed to notify task completion to planner: %w", err)
}
}

level.Debug(w.logger).Log("msg", "builder loop stopped")
return nil
}

func (w *Builder) notifyTaskCompletedToPlanner(c protos.PlannerForBuilder_BuilderLoopClient, err error) error {
Expand Down
127 changes: 127 additions & 0 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package builder

import (
"context"
"fmt"
"github.com/grafana/dskit/flagext"
"net"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func Test_BuilderLoop(t *testing.T) {
logger := log.NewNopLogger()

tasks := make([]*protos.ProtoTask, 256)
for i := range tasks {
tasks[i] = &protos.ProtoTask{
Id: fmt.Sprintf("task-%d", i),
}
}

server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

cfg := Config{
PlannerAddress: server.Addr(),
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, logger, prometheus.DefaultRegisterer)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

server.Stop()
})

err = services.StartAndAwaitRunning(context.Background(), builder)
require.NoError(t, err)

require.Eventually(t, func() bool {
return server.completedTasks == len(tasks)
}, 5*time.Second, 100*time.Millisecond)

err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

require.True(t, server.shutdownCalled)
}

type fakePlannerServer struct {
tasks []*protos.ProtoTask
completedTasks int
shutdownCalled bool

addr string
grpcServer *grpc.Server
stop chan struct{}
}

func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}

server := &fakePlannerServer{
tasks: tasks,
addr: lis.Addr().String(),
grpcServer: grpc.NewServer(),
stop: make(chan struct{}),
}

protos.RegisterPlannerForBuilderServer(server.grpcServer, server)
go func() {
if err := server.grpcServer.Serve(lis); err != nil {
panic(err)
}
}()

return server, nil
}

func (f *fakePlannerServer) Addr() string {
return f.addr
}

func (f *fakePlannerServer) Stop() {
close(f.stop)
f.grpcServer.Stop()
}

func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error {
// Receive Ready
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive ready: %w", err)
}

for _, task := range f.tasks {
if err := srv.Send(&protos.PlannerToBuilder{Task: task}); err != nil {
return fmt.Errorf("failed to send task: %w", err)
}
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
f.completedTasks++
}

// No more tasks. Emulate waiting for requeue until shutdown.
select {

Check failure on line 118 in pkg/bloombuild/builder/builder_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case <-f.stop:
return nil
}
}

func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
f.shutdownCalled = true
return &protos.NotifyBuilderShutdownResponse{}, nil
}
Loading