Skip to content

Commit

Permalink
Merge pull request cloudflare#48 from netsampler/feature/kafka-flush-…
Browse files Browse the repository at this point in the history
…control

Add flags to control Kafka Flush parameters
  • Loading branch information
lspgn authored Nov 14, 2021
2 parents de5e751 + c145be6 commit 2e1cf5b
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions transport/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"strings"
"time"

sarama "github.com/Shopify/sarama"
"github.com/netsampler/goflow2/transport"
Expand All @@ -18,11 +19,14 @@ import (
)

type KafkaDriver struct {
kafkaTLS bool
kafkaSASL bool
kafkaTopic string
kafkaSrv string
kafkaBrk string
kafkaTLS bool
kafkaSASL bool
kafkaTopic string
kafkaSrv string
kafkaBrk string
kafkaMaxMsgBytes int
kafkaFlushBytes int
kafkaFlushFrequency time.Duration

kafkaLogErrors bool

Expand All @@ -41,6 +45,9 @@ func (d *KafkaDriver) Prepare() error {
flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to")
flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)")
flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas")
flag.IntVar(&d.kafkaMaxMsgBytes, "transport.kafka.maxmsgbytes", 1000000, "Kafka max message bytes")
flag.IntVar(&d.kafkaFlushBytes, "transport.kafka.flushbytes", int(sarama.MaxRequestSize), "Kafka flush bytes")
flag.DurationVar(&d.kafkaFlushFrequency, "transport.kafka.flushfreq", time.Second*5, "Kafka flush frequency")

flag.BoolVar(&d.kafkaLogErrors, "transport.kafka.log.err", false, "Log Kafka errors")
flag.BoolVar(&d.kafkaHashing, "transport.kafka.hashing", false, "Enable partition hashing")
Expand All @@ -61,6 +68,9 @@ func (d *KafkaDriver) Init(context.Context) error {
kafkaConfig.Version = kafkaConfigVersion
kafkaConfig.Producer.Return.Successes = false
kafkaConfig.Producer.Return.Errors = d.kafkaLogErrors
kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes
kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes
kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency
if d.kafkaTLS {
rootCAs, err := x509.SystemCertPool()
if err != nil {
Expand Down

0 comments on commit 2e1cf5b

Please sign in to comment.