Skip to content

Commit

Permalink
Remove default kafka bootstrap consumer behavior
Browse files Browse the repository at this point in the history
only create a kafka consumer if explicitly set
  • Loading branch information
tiswanso committed Oct 6, 2022
1 parent ed7192b commit 7fa55b8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 34 deletions.
46 changes: 24 additions & 22 deletions cmd/allspark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,30 @@ func main() {
srv.Run()
}()

// Kafka server
wg.Add(1)
go func() {
defer wg.Done()
consumer := kafka.NewConsumer(configuration.KafkaServer.BootstrapServer, configuration.KafkaServer.Topic, configuration.KafkaServer.ConsumerGroup, logger)
srv := server.New(consumer, logger, errorHandler)
if wl != nil {
srv.SetWorkload(wl)
}

kafkaRequests, err := request.CreateRequestsFromStringSlice(viper.GetStringSlice("kafkaRequests"), logger.WithField("server", "kafka"))
if err != nil {
panic(err)
}
if len(kafkaRequests) == 0 {
kafkaRequests = requests
}

srv.SetRequests(kafkaRequests)
srv.SetSQLClient(sqlClient)
srv.Run()
}()
if configuration.KafkaServer.BootstrapServer != "" && configuration.KafkaServer.Topic != "" && configuration.KafkaServer.ConsumerGroup != "" {
// Kafka server
wg.Add(1)
go func() {
defer wg.Done()
consumer := kafka.NewConsumer(configuration.KafkaServer.BootstrapServer, configuration.KafkaServer.Topic, configuration.KafkaServer.ConsumerGroup, logger)
srv := server.New(consumer, logger, errorHandler)
if wl != nil {
srv.SetWorkload(wl)
}

kafkaRequests, err := request.CreateRequestsFromStringSlice(viper.GetStringSlice("kafkaRequests"), logger.WithField("server", "kafka"))
if err != nil {
panic(err)
}
if len(kafkaRequests) == 0 {
kafkaRequests = requests
}

srv.SetRequests(kafkaRequests)
srv.SetSQLClient(sqlClient)
srv.Run()
}()
}

wg.Wait()
}
12 changes: 0 additions & 12 deletions internal/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,5 @@ func (c *Consumer) Close() error {
}

func (c *Consumer) Validate() (*Consumer, error) {
if c.BootstrapServer == "" {
c.BootstrapServer = "kafka-all-broker.kafka:29092"
}

if c.ConsumerGroup == "" {
c.ConsumerGroup = "allspark-consumer-group"
}

if c.Topic == "" {
c.Topic = "example-topic"
}

return c, nil
}

0 comments on commit 7fa55b8

Please sign in to comment.