From edd4d1f25e402a13d8ac3d37812df08dca3f3357 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Jun 2024 09:17:31 +0200 Subject: [PATCH 1/8] Add test --- pkg/bloombuild/builder/builder.go | 29 +++++++++- pkg/bloombuild/builder/builder_test.go | 77 +++++++++++++++++++------- 2 files changed, 86 insertions(+), 20 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 3a5638ab46654..78507cf4f7831 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/pkg/errors" @@ -110,12 +111,38 @@ func (b *Builder) stopping(_ error) error { } func (b *Builder) running(ctx context.Context) error { + // Try to re-establish the connection up to 5 times. + retries := backoff.New(context.Background(), backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 5, + }) + + for retries.Ongoing() { + err := b.connectAndBuild(ctx) + if err == nil { + break + } + + level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err) + retries.Wait() + } + + if err := retries.Err(); err != nil { + return fmt.Errorf("failed to connect and build: %w", err) + } + + return nil +} + +func (b *Builder) connectAndBuild( + ctx context.Context, +) error { opts, err := b.cfg.GrpcConfig.DialOption(nil, nil) if err != nil { return fmt.Errorf("failed to create grpc dial options: %w", err) } - // TODO: Wrap hereafter in retry logic conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...) if err != nil { return fmt.Errorf("failed to dial bloom planner: %w", err) diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 149e43f3234d3..f4f84bcc1387b 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "os" "testing" "time" @@ -25,7 +26,8 @@ import ( ) func Test_BuilderLoop(t *testing.T) { - logger := log.NewNopLogger() + //logger := log.NewNopLogger() + logger := log.NewLogfmtLogger(os.Stdout) schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{ @@ -69,6 +71,9 @@ func Test_BuilderLoop(t *testing.T) { server, err := newFakePlannerServer(tasks) require.NoError(t, err) + // Start the server so the builder can connect and receive tasks. + server.Start() + limits := fakeLimits{} cfg := Config{ PlannerAddress: server.Addr(), @@ -87,6 +92,24 @@ func Test_BuilderLoop(t *testing.T) { err = services.StartAndAwaitRunning(context.Background(), builder) require.NoError(t, err) + // Wait for at least one task to be processed. + require.Eventually(t, func() bool { + return server.completedTasks.Load() > 0 + }, 5*time.Second, 100*time.Millisecond) + + // Right after stop it so connection is broken, and builder will retry. + server.Stop() + + // While the server is stopped, the builder should keep retrying to connect but no tasks should be processed. + // Note this is just a way to sleep while making sure no tasks are processed. + tasksProcessedSoFar := server.completedTasks.Load() + require.Never(t, func() bool { + return server.completedTasks.Load() > tasksProcessedSoFar + }, 5*time.Second, 500*time.Millisecond) + + // Now we start the server so the builder can connect and receive tasks. + server.Start() + require.Eventually(t, func() bool { return int(server.completedTasks.Load()) == len(tasks) }, 5*time.Second, 100*time.Millisecond) @@ -102,38 +125,53 @@ type fakePlannerServer struct { completedTasks atomic.Int64 shutdownCalled bool - addr string + lisAddr string grpcServer *grpc.Server } func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, err - } - server := &fakePlannerServer{ - tasks: tasks, - addr: lis.Addr().String(), - grpcServer: grpc.NewServer(), + tasks: tasks, } - protos.RegisterPlannerForBuilderServer(server.grpcServer, server) - go func() { - if err := server.grpcServer.Serve(lis); err != nil { - panic(err) - } - }() - return server, nil } func (f *fakePlannerServer) Addr() string { - return f.addr + if f.lisAddr == "" { + panic("server not started") + } + return f.lisAddr } func (f *fakePlannerServer) Stop() { - f.grpcServer.Stop() + if f.grpcServer != nil { + f.grpcServer.Stop() + } +} + +func (f *fakePlannerServer) Start() { + f.Stop() + + lisAddr := "localhost:0" + if f.lisAddr != "" { + // Reuse the same address if the server was stopped and started again. + lisAddr = f.lisAddr + } + + lis, err := net.Listen("tcp", lisAddr) + if err != nil { + panic(err) + } + f.lisAddr = lis.Addr().String() + + f.grpcServer = grpc.NewServer() + protos.RegisterPlannerForBuilderServer(f.grpcServer, f) + go func() { + if err := f.grpcServer.Serve(lis); err != nil { + panic(err) + } + }() } func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error { @@ -150,6 +188,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop return fmt.Errorf("failed to receive task response: %w", err) } f.completedTasks.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency. } // No more tasks. Wait until shutdown. From 56bf138be29f8b3214d00e97b4f96a44f268c42c Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Jun 2024 09:56:17 +0200 Subject: [PATCH 2/8] Fix test --- pkg/bloombuild/builder/builder.go | 32 +++++++++++++++++++++----- pkg/bloombuild/builder/builder_test.go | 16 ++++++++----- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 78507cf4f7831..e5ac5008837fa 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -112,6 +112,7 @@ func (b *Builder) stopping(_ error) error { func (b *Builder) running(ctx context.Context) error { // Try to re-establish the connection up to 5 times. + // TODO(salvacorts): Make this configurable. retries := backoff.New(context.Background(), backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 10 * time.Second, @@ -189,6 +190,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro return fmt.Errorf("failed to receive task from planner: %w", err) } + logger := log.With(b.logger, "task", protoTask.Task.Id) + b.metrics.taskStarted.Inc() start := time.Now() status := statusSuccess @@ -196,7 +199,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro newMetas, err := b.processTask(c.Context(), protoTask.Task) if err != nil { status = statusFailure - level.Error(b.logger).Log("msg", "failed to process task", "err", err) + level.Error(logger).Log("msg", "failed to process task", "err", err) } b.metrics.taskCompleted.WithLabelValues(status).Inc() @@ -224,13 +227,30 @@ func (b *Builder) notifyTaskCompletedToPlanner( CreatedMetas: metas, } - // TODO: Implement retry - if err := c.Send(&protos.BuilderToPlanner{ - BuilderID: b.ID, - Result: *result.ToProtoTaskResult(), - }); err != nil { + // We have a retry mechanism upper in the stack, but we add another one here + // to try our best to avoid losing the task result. + retries := backoff.New(context.Background(), backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 5, + }) + + for retries.Ongoing() { + if err := c.Send(&protos.BuilderToPlanner{ + BuilderID: b.ID, + Result: *result.ToProtoTaskResult(), + }); err == nil { + break + } + + level.Error(b.logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err) + retries.Wait() + } + + if err := retries.Err(); err != nil { return fmt.Errorf("failed to acknowledge task completion to planner: %w", err) } + return nil } diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index f4f84bcc1387b..3653d4184a5a2 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -94,7 +94,7 @@ func Test_BuilderLoop(t *testing.T) { // Wait for at least one task to be processed. require.Eventually(t, func() bool { - return server.completedTasks.Load() > 0 + return server.CompletedTasks() > 0 }, 5*time.Second, 100*time.Millisecond) // Right after stop it so connection is broken, and builder will retry. @@ -102,17 +102,17 @@ func Test_BuilderLoop(t *testing.T) { // While the server is stopped, the builder should keep retrying to connect but no tasks should be processed. // Note this is just a way to sleep while making sure no tasks are processed. - tasksProcessedSoFar := server.completedTasks.Load() + tasksProcessedSoFar := server.CompletedTasks() require.Never(t, func() bool { - return server.completedTasks.Load() > tasksProcessedSoFar + return server.CompletedTasks() > tasksProcessedSoFar }, 5*time.Second, 500*time.Millisecond) // Now we start the server so the builder can connect and receive tasks. server.Start() - require.Eventually(t, func() bool { - return int(server.completedTasks.Load()) == len(tasks) - }, 5*time.Second, 100*time.Millisecond) + require.Eventuallyf(t, func() bool { + return server.CompletedTasks() == len(tasks) + }, 30*time.Second, 100*time.Millisecond, "expected all tasks to be processed, got %d. Expected %d.", server.CompletedTasks(), len(tasks)) err = services.StopAndAwaitTerminated(context.Background(), builder) require.NoError(t, err) @@ -196,6 +196,10 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop return nil } +func (f *fakePlannerServer) CompletedTasks() int { + return int(f.completedTasks.Load()) +} + func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) { f.shutdownCalled = true return &protos.NotifyBuilderShutdownResponse{}, nil From 45b90a61bbfc603803879ec7be37638a3ec8d490 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Jun 2024 10:01:05 +0200 Subject: [PATCH 3/8] Configurable backoff --- pkg/bloombuild/builder/builder.go | 17 +++-------------- pkg/bloombuild/builder/builder_test.go | 6 ++++++ pkg/bloombuild/builder/config.go | 7 +++++++ 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index e5ac5008837fa..8fcae81f557d7 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -111,14 +111,8 @@ func (b *Builder) stopping(_ error) error { } func (b *Builder) running(ctx context.Context) error { - // Try to re-establish the connection up to 5 times. - // TODO(salvacorts): Make this configurable. - retries := backoff.New(context.Background(), backoff.Config{ - MinBackoff: 1 * time.Second, - MaxBackoff: 10 * time.Second, - MaxRetries: 5, - }) - + // Retry if the connection to the planner is lost. + retries := backoff.New(context.Background(), b.cfg.BackoffConfig) for retries.Ongoing() { err := b.connectAndBuild(ctx) if err == nil { @@ -229,12 +223,7 @@ func (b *Builder) notifyTaskCompletedToPlanner( // We have a retry mechanism upper in the stack, but we add another one here // to try our best to avoid losing the task result. - retries := backoff.New(context.Background(), backoff.Config{ - MinBackoff: 1 * time.Second, - MaxBackoff: 10 * time.Second, - MaxRetries: 5, - }) - + retries := backoff.New(context.Background(), b.cfg.BackoffConfig) for retries.Ongoing() { if err := c.Send(&protos.BuilderToPlanner{ BuilderID: b.ID, diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 3653d4184a5a2..cfd0906f87a2f 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" @@ -77,6 +78,11 @@ func Test_BuilderLoop(t *testing.T) { limits := fakeLimits{} cfg := Config{ PlannerAddress: server.Addr(), + BackoffConfig: backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 5, + }, } flagext.DefaultValues(&cfg.GrpcConfig) diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index 25cefa4215224..274b7ae4dd466 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -3,7 +3,9 @@ package builder import ( "flag" "fmt" + "time" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/grpcclient" ) @@ -11,12 +13,17 @@ import ( type Config struct { GrpcConfig grpcclient.Config `yaml:"grpc_config"` PlannerAddress string `yaml:"planner_address"` + BackoffConfig backoff.Config `yaml:"backoff_config"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) + + f.DurationVar(&cfg.BackoffConfig.MinBackoff, "dynamodb.min-backoff", 1*time.Second, "Minimum backoff time") + f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "dynamodb.max-backoff", 10*time.Second, "Maximum backoff time") + f.IntVar(&cfg.BackoffConfig.MaxRetries, "dynamodb.max-retries", 5, "Maximum number of times to retry an operation") } func (cfg *Config) Validate() error { From e30b37fa67223e035f37a15c0fd49c2d46b358f5 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Jun 2024 11:03:47 +0200 Subject: [PATCH 4/8] Fix test --- pkg/bloombuild/builder/builder_test.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index cfd0906f87a2f..60da173747655 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "os" "testing" "time" @@ -15,7 +14,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -27,8 +25,8 @@ import ( ) func Test_BuilderLoop(t *testing.T) { - //logger := log.NewNopLogger() - logger := log.NewLogfmtLogger(os.Stdout) + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{ @@ -116,9 +114,9 @@ func Test_BuilderLoop(t *testing.T) { // Now we start the server so the builder can connect and receive tasks. server.Start() - require.Eventuallyf(t, func() bool { - return server.CompletedTasks() == len(tasks) - }, 30*time.Second, 100*time.Millisecond, "expected all tasks to be processed, got %d. Expected %d.", server.CompletedTasks(), len(tasks)) + require.Eventually(t, func() bool { + return server.CompletedTasks() >= len(tasks) + }, 10*time.Second, 500*time.Millisecond) err = services.StopAndAwaitTerminated(context.Background(), builder) require.NoError(t, err) @@ -128,7 +126,7 @@ func Test_BuilderLoop(t *testing.T) { type fakePlannerServer struct { tasks []*protos.ProtoTask - completedTasks atomic.Int64 + completedTasks int shutdownCalled bool lisAddr string @@ -193,7 +191,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive task response: %w", err) } - f.completedTasks.Add(1) + f.completedTasks++ time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency. } @@ -203,7 +201,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop } func (f *fakePlannerServer) CompletedTasks() int { - return int(f.completedTasks.Load()) + return f.completedTasks } func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) { From d5ea7d5a5f8775ffd6bbf4bdfba7056b50ac0d2c Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Jun 2024 11:08:32 +0200 Subject: [PATCH 5/8] Fix config flag --- pkg/bloombuild/builder/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index 274b7ae4dd466..3906cb0c52a52 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -21,9 +21,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) - f.DurationVar(&cfg.BackoffConfig.MinBackoff, "dynamodb.min-backoff", 1*time.Second, "Minimum backoff time") - f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "dynamodb.max-backoff", 10*time.Second, "Maximum backoff time") - f.IntVar(&cfg.BackoffConfig.MaxRetries, "dynamodb.max-retries", 5, "Maximum number of times to retry an operation") + f.DurationVar(&cfg.BackoffConfig.MinBackoff, prefix+".backoff.min-backoff", 1*time.Second, "Minimum backoff time") + f.DurationVar(&cfg.BackoffConfig.MaxBackoff, prefix+".backoff.max-backoff", 10*time.Second, "Maximum backoff time") + f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+".backoff.max-retries", 5, "Maximum number of times to retry an operation") } func (cfg *Config) Validate() error { From 60ab70ed59c2021f768fbf1a3804bf8b98836204 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Jun 2024 11:14:51 +0200 Subject: [PATCH 6/8] Docs --- docs/sources/shared/configuration.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 89d4615418459..4990f0731e46c 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -377,6 +377,19 @@ bloom_build: # CLI flag: -bloom-build.builder.planner-address [planner_address: | default = ""] + backoff_config: + # Minimum backoff time + # CLI flag: -bloom-build.builder.backoff.min-backoff + [min_period: | default = 1s] + + # Maximum backoff time + # CLI flag: -bloom-build.builder.backoff.max-backoff + [max_period: | default = 10s] + + # Maximum number of times to retry an operation + # CLI flag: -bloom-build.builder.backoff.max-retries + [max_retries: | default = 5] + # Experimental: The bloom_gateway block configures the Loki bloom gateway # server, responsible for serving queries for filtering chunks based on filter # expressions. From 55f46f2f30541ef3b42faac500aca6d281152698 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 26 Jun 2024 09:49:37 +0200 Subject: [PATCH 7/8] CR feedback --- docs/sources/shared/configuration.md | 16 ++++++++-------- pkg/bloombuild/builder/builder.go | 13 ++++++++----- pkg/bloombuild/builder/builder_test.go | 19 ++++++++++--------- pkg/bloombuild/builder/config.go | 7 +------ 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 4990f0731e46c..a3966db2f9af1 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -378,17 +378,17 @@ bloom_build: [planner_address: | default = ""] backoff_config: - # Minimum backoff time - # CLI flag: -bloom-build.builder.backoff.min-backoff - [min_period: | default = 1s] + # Minimum delay when backing off. + # CLI flag: -bloom-build.builder.backoff.backoff-min-period + [min_period: | default = 100ms] - # Maximum backoff time - # CLI flag: -bloom-build.builder.backoff.max-backoff + # Maximum delay when backing off. + # CLI flag: -bloom-build.builder.backoff.backoff-max-period [max_period: | default = 10s] - # Maximum number of times to retry an operation - # CLI flag: -bloom-build.builder.backoff.max-retries - [max_retries: | default = 5] + # Number of times to backoff and retry before failing. + # CLI flag: -bloom-build.builder.backoff.backoff-retries + [max_retries: | default = 10] # Experimental: The bloom_gateway block configures the Loki bloom gateway # server, responsible for serving queries for filtering chunks based on filter diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 8fcae81f557d7..f05c1fc08fc3a 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -112,10 +112,10 @@ func (b *Builder) stopping(_ error) error { func (b *Builder) running(ctx context.Context) error { // Retry if the connection to the planner is lost. - retries := backoff.New(context.Background(), b.cfg.BackoffConfig) + retries := backoff.New(ctx, b.cfg.BackoffConfig) for retries.Ongoing() { err := b.connectAndBuild(ctx) - if err == nil { + if err == nil || errors.Is(err, context.Canceled) { break } @@ -124,6 +124,9 @@ func (b *Builder) running(ctx context.Context) error { } if err := retries.Err(); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } return fmt.Errorf("failed to connect and build: %w", err) } @@ -172,8 +175,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro } for b.State() == services.Running { - // When the planner connection closes or the builder stops, the context - // will be canceled and the loop will exit. + // When the planner connection closes, an EOF or "planner shutting down" error is returned. + // When the builder is shutting down, a gRPC context canceled error is returned. protoTask, err := c.Recv() if err != nil { if status.Code(err) == codes.Canceled { @@ -223,7 +226,7 @@ func (b *Builder) notifyTaskCompletedToPlanner( // We have a retry mechanism upper in the stack, but we add another one here // to try our best to avoid losing the task result. - retries := backoff.New(context.Background(), b.cfg.BackoffConfig) + retries := backoff.New(c.Context(), b.cfg.BackoffConfig) for retries.Ongoing() { if err := c.Send(&protos.BuilderToPlanner{ BuilderID: b.ID, diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 60da173747655..7b35ee19b2d7b 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -126,10 +127,10 @@ func Test_BuilderLoop(t *testing.T) { type fakePlannerServer struct { tasks []*protos.ProtoTask - completedTasks int + completedTasks atomic.Int64 shutdownCalled bool - lisAddr string + listenAddr string grpcServer *grpc.Server } @@ -142,10 +143,10 @@ func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) } func (f *fakePlannerServer) Addr() string { - if f.lisAddr == "" { + if f.listenAddr == "" { panic("server not started") } - return f.lisAddr + return f.listenAddr } func (f *fakePlannerServer) Stop() { @@ -158,16 +159,16 @@ func (f *fakePlannerServer) Start() { f.Stop() lisAddr := "localhost:0" - if f.lisAddr != "" { + if f.listenAddr != "" { // Reuse the same address if the server was stopped and started again. - lisAddr = f.lisAddr + lisAddr = f.listenAddr } lis, err := net.Listen("tcp", lisAddr) if err != nil { panic(err) } - f.lisAddr = lis.Addr().String() + f.listenAddr = lis.Addr().String() f.grpcServer = grpc.NewServer() protos.RegisterPlannerForBuilderServer(f.grpcServer, f) @@ -191,7 +192,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive task response: %w", err) } - f.completedTasks++ + f.completedTasks.Inc() time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency. } @@ -201,7 +202,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop } func (f *fakePlannerServer) CompletedTasks() int { - return f.completedTasks + return int(f.completedTasks.Load()) } func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) { diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index 3906cb0c52a52..835f58f2d0fa9 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -3,8 +3,6 @@ package builder import ( "flag" "fmt" - "time" - "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/grpcclient" ) @@ -20,10 +18,7 @@ type Config struct { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) - - f.DurationVar(&cfg.BackoffConfig.MinBackoff, prefix+".backoff.min-backoff", 1*time.Second, "Minimum backoff time") - f.DurationVar(&cfg.BackoffConfig.MaxBackoff, prefix+".backoff.max-backoff", 10*time.Second, "Maximum backoff time") - f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+".backoff.max-retries", 5, "Maximum number of times to retry an operation") + cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f) } func (cfg *Config) Validate() error { From f158c9f8de6de313d6d8b3369a689ff813c7f2b0 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 26 Jun 2024 10:21:51 +0200 Subject: [PATCH 8/8] Fix test --- pkg/bloombuild/builder/builder_test.go | 11 +++++++++-- pkg/bloombuild/builder/config.go | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 7b35ee19b2d7b..764e8cb6350f8 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync" "testing" "time" @@ -117,7 +118,7 @@ func Test_BuilderLoop(t *testing.T) { require.Eventually(t, func() bool { return server.CompletedTasks() >= len(tasks) - }, 10*time.Second, 500*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond) err = services.StopAndAwaitTerminated(context.Background(), builder) require.NoError(t, err) @@ -132,6 +133,7 @@ type fakePlannerServer struct { listenAddr string grpcServer *grpc.Server + wg sync.WaitGroup } func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) { @@ -153,6 +155,8 @@ func (f *fakePlannerServer) Stop() { if f.grpcServer != nil { f.grpcServer.Stop() } + + f.wg.Wait() } func (f *fakePlannerServer) Start() { @@ -180,6 +184,9 @@ func (f *fakePlannerServer) Start() { } func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error { + f.wg.Add(1) + defer f.wg.Done() + // Receive Ready if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive ready: %w", err) @@ -192,8 +199,8 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive task response: %w", err) } - f.completedTasks.Inc() time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency. + f.completedTasks.Inc() } // No more tasks. Wait until shutdown. diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index 835f58f2d0fa9..d0c553104b09e 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -3,6 +3,7 @@ package builder import ( "flag" "fmt" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/grpcclient" )