From 7fa55b8d6dc5fd0c0a4ebd1719c40957d8e162ef Mon Sep 17 00:00:00 2001 From: Tim Swanson Date: Wed, 5 Oct 2022 22:34:54 -0400 Subject: [PATCH] Remove default kafka bootstrap consumer behavior only create a kafka consumer if explicitly set --- cmd/allspark/main.go | 46 ++++++++++++++++++++------------------ internal/kafka/consumer.go | 12 ---------- 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/cmd/allspark/main.go b/cmd/allspark/main.go index 43e81e5..54f3847 100644 --- a/cmd/allspark/main.go +++ b/cmd/allspark/main.go @@ -184,28 +184,30 @@ func main() { srv.Run() }() - // Kafka server - wg.Add(1) - go func() { - defer wg.Done() - consumer := kafka.NewConsumer(configuration.KafkaServer.BootstrapServer, configuration.KafkaServer.Topic, configuration.KafkaServer.ConsumerGroup, logger) - srv := server.New(consumer, logger, errorHandler) - if wl != nil { - srv.SetWorkload(wl) - } - - kafkaRequests, err := request.CreateRequestsFromStringSlice(viper.GetStringSlice("kafkaRequests"), logger.WithField("server", "kafka")) - if err != nil { - panic(err) - } - if len(kafkaRequests) == 0 { - kafkaRequests = requests - } - - srv.SetRequests(kafkaRequests) - srv.SetSQLClient(sqlClient) - srv.Run() - }() + if configuration.KafkaServer.BootstrapServer != "" && configuration.KafkaServer.Topic != "" && configuration.KafkaServer.ConsumerGroup != "" { + // Kafka server + wg.Add(1) + go func() { + defer wg.Done() + consumer := kafka.NewConsumer(configuration.KafkaServer.BootstrapServer, configuration.KafkaServer.Topic, configuration.KafkaServer.ConsumerGroup, logger) + srv := server.New(consumer, logger, errorHandler) + if wl != nil { + srv.SetWorkload(wl) + } + + kafkaRequests, err := request.CreateRequestsFromStringSlice(viper.GetStringSlice("kafkaRequests"), logger.WithField("server", "kafka")) + if err != nil { + panic(err) + } + if len(kafkaRequests) == 0 { + kafkaRequests = requests + } + + srv.SetRequests(kafkaRequests) + srv.SetSQLClient(sqlClient) + srv.Run() + }() + } wg.Wait() } diff --git a/internal/kafka/consumer.go b/internal/kafka/consumer.go index 471d3a3..2b51b85 100644 --- a/internal/kafka/consumer.go +++ b/internal/kafka/consumer.go @@ -74,17 +74,5 @@ func (c *Consumer) Close() error { } func (c *Consumer) Validate() (*Consumer, error) { - if c.BootstrapServer == "" { - c.BootstrapServer = "kafka-all-broker.kafka:29092" - } - - if c.ConsumerGroup == "" { - c.ConsumerGroup = "allspark-consumer-group" - } - - if c.Topic == "" { - c.Topic = "example-topic" - } - return c, nil }