From ab7af0575dc84070a11d4b589fd5c8a223baf536 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 29 May 2024 14:18:44 +0200 Subject: [PATCH] refactor(blooms): Builder retrieves tasks from planner (#13046) --- docs/sources/shared/configuration.md | 10 ++ pkg/bloombuild/builder/builder.go | 137 +++++++++++++++++++++++-- pkg/bloombuild/builder/builder_test.go | 123 ++++++++++++++++++++++ pkg/bloombuild/builder/config.go | 23 ++++- pkg/bloombuild/builder/metrics.go | 33 ++++++ pkg/bloombuild/planner/planner.go | 10 +- pkg/bloombuild/protos/service.pb.go | 87 +++++++++++++--- pkg/bloombuild/protos/service.proto | 1 + 8 files changed, 392 insertions(+), 32 deletions(-) create mode 100644 pkg/bloombuild/builder/builder_test.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index c0e4bdeeca4d2..a8a4a7df720e6 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -356,6 +356,15 @@ bloom_build: [max_queued_tasks_per_tenant: | default = 30000] builder: + # The grpc_client block configures the gRPC client used to communicate + # between a client and server component in Loki. + # The CLI flags prefix for this block configuration is: + # bloom-build.builder.grpc + [grpc_config: ] + + # Hostname (and port) of the bloom planner + # CLI flag: -bloom-build.builder.planner-address + [planner_address: | default = ""] # Experimental: The bloom_gateway block configures the Loki bloom gateway # server, responsible for serving queries for filtering chunks based on filter @@ -2335,6 +2344,7 @@ The `gcs_storage_config` block configures the connection to Google Cloud Storage The `grpc_client` block configures the gRPC client used to communicate between a client and server component in Loki. The supported CLI flags `` used to reference this configuration block are: - `bigtable` +- `bloom-build.builder.grpc` - `bloom-gateway-client.grpc` - `boltdb.shipper.index-gateway-client.grpc` - `frontend.grpc-client-config` diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 098e7d6d83f00..aa1455fc38c06 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -2,49 +2,164 @@ package builder import ( "context" + "fmt" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" utillog "github.com/grafana/loki/v3/pkg/util/log" ) -type Worker struct { +type Builder struct { services.Service + ID string + cfg Config metrics *Metrics logger log.Logger + + client protos.PlannerForBuilderClient } func New( cfg Config, logger log.Logger, r prometheus.Registerer, -) (*Worker, error) { +) (*Builder, error) { utillog.WarnExperimentalUse("Bloom Builder", logger) - w := &Worker{ + b := &Builder{ + ID: uuid.NewString(), cfg: cfg, metrics: NewMetrics(r), logger: logger, } - w.Service = services.NewBasicService(w.starting, w.running, w.stopping) - return w, nil + b.Service = services.NewBasicService(b.starting, b.running, b.stopping) + return b, nil +} + +func (b *Builder) starting(_ context.Context) error { + b.metrics.running.Set(1) + return nil } -func (w *Worker) starting(_ context.Context) (err error) { - w.metrics.running.Set(1) - return err +func (b *Builder) stopping(_ error) error { + if b.client != nil { + req := &protos.NotifyBuilderShutdownRequest{ + BuilderID: b.ID, + } + if _, err := b.client.NotifyBuilderShutdown(context.Background(), req); err != nil { + level.Error(b.logger).Log("msg", "failed to notify planner about builder shutdown", "err", err) + } + } + + b.metrics.running.Set(0) + return nil } -func (w *Worker) stopping(_ error) error { - w.metrics.running.Set(0) +func (b *Builder) running(ctx context.Context) error { + opts, err := b.cfg.GrpcConfig.DialOption(nil, nil) + if err != nil { + return fmt.Errorf("failed to create grpc dial options: %w", err) + } + + // TODO: Wrap hereafter in retry logic + conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...) + if err != nil { + return fmt.Errorf("failed to dial bloom planner: %w", err) + } + + b.client = protos.NewPlannerForBuilderClient(conn) + + c, err := b.client.BuilderLoop(ctx) + if err != nil { + return fmt.Errorf("failed to start builder loop: %w", err) + } + + // Start processing tasks from planner + if err := b.builderLoop(c); err != nil { + return fmt.Errorf("builder loop failed: %w", err) + } + return nil } -func (w *Worker) running(_ context.Context) error { +func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error { + // Send ready message to planner + if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil { + return fmt.Errorf("failed to send ready message to planner: %w", err) + } + + for b.State() == services.Running { + // When the planner connection closes or the builder stops, the context + // will be canceled and the loop will exit. + protoTask, err := c.Recv() + if err != nil { + if errors.Is(c.Context().Err(), context.Canceled) { + level.Debug(b.logger).Log("msg", "builder loop context canceled") + return nil + } + + return fmt.Errorf("failed to receive task from planner: %w", err) + } + + b.metrics.taskStarted.Inc() + start := time.Now() + status := statusSuccess + + err = b.processTask(c.Context(), protoTask.Task) + if err != nil { + status = statusFailure + level.Error(b.logger).Log("msg", "failed to process task", "err", err) + } + + b.metrics.taskCompleted.WithLabelValues(status).Inc() + b.metrics.taskDuration.WithLabelValues(status).Observe(time.Since(start).Seconds()) + + // Acknowledge task completion to planner + if err = b.notifyTaskCompletedToPlanner(c, err); err != nil { + return fmt.Errorf("failed to notify task completion to planner: %w", err) + } + } + + level.Debug(b.logger).Log("msg", "builder loop stopped") + return nil +} + +func (b *Builder) notifyTaskCompletedToPlanner(c protos.PlannerForBuilder_BuilderLoopClient, err error) error { + var errMsg string + if err != nil { + errMsg = err.Error() + } + + // TODO: Implement retry + if err := c.Send(&protos.BuilderToPlanner{ + BuilderID: b.ID, + Error: errMsg, + }); err != nil { + return fmt.Errorf("failed to acknowledge task completion to planner: %w", err) + } + return nil +} + +func (b *Builder) processTask(_ context.Context, protoTask *protos.ProtoTask) error { + task, err := protos.FromProtoTask(protoTask) + if err != nil { + return fmt.Errorf("failed to convert proto task to task: %w", err) + } + + level.Debug(b.logger).Log("msg", "received task", "task", task.ID) + + // TODO: Implement task processing + return nil } diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go new file mode 100644 index 0000000000000..ba2c7eae30bb0 --- /dev/null +++ b/pkg/bloombuild/builder/builder_test.go @@ -0,0 +1,123 @@ +package builder + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/grafana/loki/v3/pkg/bloombuild/protos" +) + +func Test_BuilderLoop(t *testing.T) { + logger := log.NewNopLogger() + + tasks := make([]*protos.ProtoTask, 256) + for i := range tasks { + tasks[i] = &protos.ProtoTask{ + Id: fmt.Sprintf("task-%d", i), + } + } + + server, err := newFakePlannerServer(tasks) + require.NoError(t, err) + + cfg := Config{ + PlannerAddress: server.Addr(), + } + flagext.DefaultValues(&cfg.GrpcConfig) + + builder, err := New(cfg, logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + t.Cleanup(func() { + err = services.StopAndAwaitTerminated(context.Background(), builder) + require.NoError(t, err) + + server.Stop() + }) + + err = services.StartAndAwaitRunning(context.Background(), builder) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return server.completedTasks == len(tasks) + }, 5*time.Second, 100*time.Millisecond) + + err = services.StopAndAwaitTerminated(context.Background(), builder) + require.NoError(t, err) + + require.True(t, server.shutdownCalled) +} + +type fakePlannerServer struct { + tasks []*protos.ProtoTask + completedTasks int + shutdownCalled bool + + addr string + grpcServer *grpc.Server +} + +func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + + server := &fakePlannerServer{ + tasks: tasks, + addr: lis.Addr().String(), + grpcServer: grpc.NewServer(), + } + + protos.RegisterPlannerForBuilderServer(server.grpcServer, server) + go func() { + if err := server.grpcServer.Serve(lis); err != nil { + panic(err) + } + }() + + return server, nil +} + +func (f *fakePlannerServer) Addr() string { + return f.addr +} + +func (f *fakePlannerServer) Stop() { + f.grpcServer.Stop() +} + +func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error { + // Receive Ready + if _, err := srv.Recv(); err != nil { + return fmt.Errorf("failed to receive ready: %w", err) + } + + for _, task := range f.tasks { + if err := srv.Send(&protos.PlannerToBuilder{Task: task}); err != nil { + return fmt.Errorf("failed to send task: %w", err) + } + if _, err := srv.Recv(); err != nil { + return fmt.Errorf("failed to receive task response: %w", err) + } + f.completedTasks++ + } + + // No more tasks. Wait until shutdown. + <-srv.Context().Done() + return nil +} + +func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) { + f.shutdownCalled = true + return &protos.NotifyBuilderShutdownResponse{}, nil +} diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index ac282ccf95ebb..122288c120416 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -1,18 +1,33 @@ package builder -import "flag" +import ( + "flag" + "fmt" + + "github.com/grafana/dskit/grpcclient" +) // Config configures the bloom-builder component. type Config struct { - // TODO: Add config + GrpcConfig grpcclient.Config `yaml:"grpc_config"` + PlannerAddress string `yaml:"planner_address"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. -func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) { - // TODO: Register flags with flagsPrefix +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") + cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) } func (cfg *Config) Validate() error { + if cfg.PlannerAddress == "" { + return fmt.Errorf("planner address is required") + } + + if err := cfg.GrpcConfig.Validate(); err != nil { + return fmt.Errorf("grpc config is invalid: %w", err) + } + return nil } diff --git a/pkg/bloombuild/builder/metrics.go b/pkg/bloombuild/builder/metrics.go index e8f46fa025080..90a59cd4d402a 100644 --- a/pkg/bloombuild/builder/metrics.go +++ b/pkg/bloombuild/builder/metrics.go @@ -8,10 +8,17 @@ import ( const ( metricsNamespace = "loki" metricsSubsystem = "bloombuilder" + + statusSuccess = "success" + statusFailure = "failure" ) type Metrics struct { running prometheus.Gauge + + taskStarted prometheus.Counter + taskCompleted *prometheus.CounterVec + taskDuration *prometheus.HistogramVec } func NewMetrics(r prometheus.Registerer) *Metrics { @@ -22,5 +29,31 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "running", Help: "Value will be 1 if the bloom builder is currently running on this instance", }), + + taskStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "task_started_total", + Help: "Total number of task started", + }), + taskCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "task_completed_total", + Help: "Total number of task completed", + }, []string{"status"}), + taskDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "task_duration_seconds", + Help: "Time spent processing a task.", + // Buckets in seconds: + Buckets: append( + // 1s --> 1h (steps of 10 minutes) + prometheus.LinearBuckets(1, 600, 6), + // 1h --> 24h (steps of 1 hour) + prometheus.LinearBuckets(3600, 3600, 24)..., + ), + }, []string{"status"}), } } diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index dfb6fea80cc3f..7a7954c67e05f 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -607,9 +607,15 @@ func (p *Planner) forwardTaskToBuilder( } // TODO(salvacorts): Implement timeout and retry for builder response. - _, err := builder.Recv() + res, err := builder.Recv() + if err != nil { + return fmt.Errorf("error receiving response from builder (%s): %w", builderID, err) + } + if res.GetError() != "" { + return fmt.Errorf("error processing task in builder (%s): %s", builderID, res.GetError()) + } - return err + return nil } func (p *Planner) isRunningOrStopping() bool { diff --git a/pkg/bloombuild/protos/service.pb.go b/pkg/bloombuild/protos/service.pb.go index 91684dd90ef8e..257206f275823 100644 --- a/pkg/bloombuild/protos/service.pb.go +++ b/pkg/bloombuild/protos/service.pb.go @@ -31,6 +31,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type BuilderToPlanner struct { BuilderID string `protobuf:"bytes,1,opt,name=builderID,proto3" json:"builderID,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` } func (m *BuilderToPlanner) Reset() { *m = BuilderToPlanner{} } @@ -72,6 +73,13 @@ func (m *BuilderToPlanner) GetBuilderID() string { return "" } +func (m *BuilderToPlanner) GetError() string { + if m != nil { + return m.Error + } + return "" +} + type PlannerToBuilder struct { Task *ProtoTask `protobuf:"bytes,1,opt,name=task,proto3" json:"task,omitempty"` } @@ -205,27 +213,28 @@ func init() { } var fileDescriptor_89de33e08b859356 = []byte{ - // 323 bytes of a gzipped FileDescriptorProto + // 339 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x2e, 0xc8, 0x4e, 0xd7, 0x4f, 0xca, 0xc9, 0xcf, 0xcf, 0x4d, 0x2a, 0xcd, 0xcc, 0x49, 0xd1, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x2f, 0xd6, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x03, 0x73, 0x85, 0xd8, 0x20, 0xa2, 0x52, 0x22, 0xe9, 0xf9, 0xe9, 0xf9, 0x60, 0xb6, 0x3e, 0x88, 0x05, 0x91, 0x95, 0x52, 0xc4, 0x6e, - 0x44, 0x49, 0x65, 0x41, 0x6a, 0x31, 0x44, 0x89, 0x92, 0x01, 0x97, 0x80, 0x13, 0x48, 0x2e, 0xb5, + 0x44, 0x49, 0x65, 0x41, 0x6a, 0x31, 0x44, 0x89, 0x92, 0x1b, 0x97, 0x80, 0x13, 0x48, 0x2e, 0xb5, 0x28, 0x24, 0x3f, 0x20, 0x27, 0x31, 0x2f, 0x2f, 0xb5, 0x48, 0x48, 0x86, 0x8b, 0x33, 0x09, 0x22, - 0xe6, 0xe9, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x84, 0x10, 0x50, 0xb2, 0xe4, 0x12, 0x80, - 0x2a, 0x0c, 0xc9, 0x87, 0x6a, 0x15, 0x52, 0xe5, 0x62, 0x29, 0x49, 0x2c, 0xce, 0x06, 0x2b, 0xe6, - 0x36, 0x12, 0x84, 0x98, 0x5d, 0xac, 0x17, 0x00, 0xa2, 0x42, 0x12, 0x8b, 0xb3, 0x83, 0xc0, 0xd2, - 0x4a, 0x36, 0x5c, 0x32, 0x7e, 0xf9, 0x25, 0x99, 0x69, 0x95, 0x50, 0x7d, 0xc1, 0x19, 0xa5, 0x25, - 0x29, 0xf9, 0xe5, 0x79, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x04, 0x2c, 0x96, 0xe7, 0x92, + 0xe6, 0xe9, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x84, 0x10, 0x10, 0x12, 0xe1, 0x62, 0x4d, + 0x2d, 0x2a, 0xca, 0x2f, 0x92, 0x60, 0x02, 0xcb, 0x40, 0x38, 0x4a, 0x96, 0x5c, 0x02, 0x50, 0xed, + 0x21, 0xf9, 0x50, 0x03, 0x85, 0x54, 0xb9, 0x58, 0x4a, 0x12, 0x8b, 0xb3, 0xc1, 0x46, 0x70, 0x1b, + 0x09, 0x42, 0x6c, 0x2c, 0xd6, 0x0b, 0x00, 0x51, 0x21, 0x89, 0xc5, 0xd9, 0x41, 0x60, 0x69, 0x25, + 0x1b, 0x2e, 0x19, 0xbf, 0xfc, 0x92, 0xcc, 0xb4, 0x4a, 0xa8, 0xbe, 0xe0, 0x8c, 0xd2, 0x92, 0x94, + 0xfc, 0xf2, 0xbc, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0xfc, 0xce, 0x51, 0x92, 0xe7, 0x92, 0xc5, 0xa1, 0xbb, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0xd5, 0xe8, 0x08, 0x23, 0x97, 0x20, 0xd4, 0x69, 0x6e, 0xf9, 0x45, 0x30, 0xb7, 0xb9, 0x73, 0x71, 0x43, 0x99, 0x3e, 0xf9, 0xf9, 0x05, 0x42, 0x12, - 0x30, 0xc7, 0xa1, 0x7b, 0x5b, 0x0a, 0x2e, 0x83, 0xee, 0x3d, 0x25, 0x06, 0x0d, 0x46, 0x03, 0x46, - 0xa1, 0x34, 0x2e, 0x51, 0xac, 0xf6, 0x0b, 0xa9, 0xc0, 0x34, 0xe2, 0xf3, 0x9c, 0x94, 0x2a, 0x01, - 0x55, 0x10, 0x4f, 0x28, 0x31, 0x38, 0xd9, 0x5c, 0x78, 0x28, 0xc7, 0x70, 0xe3, 0xa1, 0x1c, 0xc3, - 0x87, 0x87, 0x72, 0x8c, 0x0d, 0x8f, 0xe4, 0x18, 0x57, 0x3c, 0x92, 0x63, 0x3c, 0xf1, 0x48, 0x8e, - 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x5f, 0x3c, 0x92, 0x63, 0xf8, 0xf0, 0x48, - 0x8e, 0x71, 0xc2, 0x63, 0x39, 0x86, 0x0b, 0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x88, 0x82, - 0xa6, 0x84, 0x24, 0x08, 0x6d, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x28, 0x86, 0x3f, 0xfe, 0x3f, + 0x30, 0xc7, 0xa1, 0x07, 0x86, 0x14, 0x5c, 0x06, 0xdd, 0x7b, 0x4a, 0x0c, 0x1a, 0x8c, 0x06, 0x8c, + 0x42, 0x69, 0x5c, 0xa2, 0x58, 0xed, 0x17, 0x52, 0x81, 0x69, 0xc4, 0xe7, 0x39, 0x29, 0x55, 0x02, + 0xaa, 0x20, 0x9e, 0x50, 0x62, 0x70, 0xb2, 0xb9, 0xf0, 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, + 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, + 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, + 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x05, + 0x4d, 0x1f, 0x49, 0x10, 0xda, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x9f, 0x41, 0x32, 0x50, 0x55, 0x02, 0x00, 0x00, } @@ -251,6 +260,9 @@ func (this *BuilderToPlanner) Equal(that interface{}) bool { if this.BuilderID != that1.BuilderID { return false } + if this.Error != that1.Error { + return false + } return true } func (this *PlannerToBuilder) Equal(that interface{}) bool { @@ -326,9 +338,10 @@ func (this *BuilderToPlanner) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 5) + s := make([]string, 0, 6) s = append(s, "&protos.BuilderToPlanner{") s = append(s, "BuilderID: "+fmt.Sprintf("%#v", this.BuilderID)+",\n") + s = append(s, "Error: "+fmt.Sprintf("%#v", this.Error)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -541,6 +554,13 @@ func (m *BuilderToPlanner) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Error) > 0 { + i -= len(m.Error) + copy(dAtA[i:], m.Error) + i = encodeVarintService(dAtA, i, uint64(len(m.Error))) + i-- + dAtA[i] = 0x12 + } if len(m.BuilderID) > 0 { i -= len(m.BuilderID) copy(dAtA[i:], m.BuilderID) @@ -660,6 +680,10 @@ func (m *BuilderToPlanner) Size() (n int) { if l > 0 { n += 1 + l + sovService(uint64(l)) } + l = len(m.Error) + if l > 0 { + n += 1 + l + sovService(uint64(l)) + } return n } @@ -710,6 +734,7 @@ func (this *BuilderToPlanner) String() string { } s := strings.Join([]string{`&BuilderToPlanner{`, `BuilderID:` + fmt.Sprintf("%v", this.BuilderID) + `,`, + `Error:` + fmt.Sprintf("%v", this.Error) + `,`, `}`, }, "") return s @@ -812,6 +837,38 @@ func (m *BuilderToPlanner) Unmarshal(dAtA []byte) error { } m.BuilderID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowService + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthService + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthService + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipService(dAtA[iNdEx:]) diff --git a/pkg/bloombuild/protos/service.proto b/pkg/bloombuild/protos/service.proto index e061684c41bea..1d172f31ec9b9 100644 --- a/pkg/bloombuild/protos/service.proto +++ b/pkg/bloombuild/protos/service.proto @@ -17,6 +17,7 @@ service PlannerForBuilder { message BuilderToPlanner { string builderID = 1; + string error = 2; } message PlannerToBuilder {