diff --git a/e2e/service.go b/e2e/service.go index 97e0c93..6f53070 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "strings" "time" "github.com/cloudhut/kminion/v2/kafka" @@ -73,6 +74,8 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner())) // Create kafka service and check if client can successfully connect to Kafka cluster + logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", + zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ","))) client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts) if err != nil { return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err) @@ -144,7 +147,6 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k // Start starts the service (wow) func (s *Service) Start(ctx context.Context) error { - // Ensure topic exists and is configured correctly if err := s.validateManagementTopic(ctx); err != nil { return fmt.Errorf("could not validate end-to-end topic: %w", err) diff --git a/kafka/service.go b/kafka/service.go index 3fea151..a12ca46 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -54,12 +54,14 @@ func (s *Service) CreateAndTestClient(ctx context.Context, l *zap.Logger, opts [ return client, nil } +// Brokers returns list of brokers this service is connecting to +func (s *Service) Brokers() []string { + return s.cfg.Brokers +} + // testConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be // returned if connecting fails. func (s *Service) testConnection(client *kgo.Client, ctx context.Context) error { - s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", - zap.String("seed_brokers", strings.Join(s.cfg.Brokers, ","))) - req := kmsg.MetadataRequest{ Topics: nil, } diff --git a/minion/service.go b/minion/service.go index 131ced2..84efae9 100644 --- a/minion/service.go +++ b/minion/service.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "regexp" + "strings" "sync" "time" @@ -52,6 +53,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics kgo.ConsumeTopics("__consumer_offsets")) } + logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", + zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ","))) + client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts) if err != nil { return nil, fmt.Errorf("failed to create kafka client: %w", err)