Skip to content

Commit

Permalink
Added the ability to configure the log message format for kafka
Browse files Browse the repository at this point in the history
Updated the default log message format to version 0.11.0.0 as this is the minimum needed to support kafka 2.x
If needed, the message format version can be overridden with the kafka.version flag on any of the cli tools and by calling transport.SetKafkaVersion() if using as a library
  • Loading branch information
Dwayn Matthies committed Dec 12, 2019
1 parent 610850e commit e620ed0
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions transport/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"strings"

flowmessage "github.com/cloudflare/goflow/pb"
"github.com/cloudflare/goflow/utils"
proto "github.com/golang/protobuf/proto"

//"github.com/golang/protobuf/descriptor"
"errors"
"flag"
sarama "github.com/Shopify/sarama"
"os"
"reflect"
"strings"

sarama "github.com/Shopify/sarama"
)

var (
Expand All @@ -27,6 +30,9 @@ var (

KafkaHashing *bool
KafkaKeying *string
KafkaVersion *string

kafkaConfigVersion sarama.KafkaVersion = sarama.V0_11_0_0
)

type KafkaState struct {
Expand All @@ -36,6 +42,17 @@ type KafkaState struct {
keying []string
}

// SetKafkaVersion sets the KafkaVersion that is used to set the log message format version
func SetKafkaVersion(version sarama.KafkaVersion) {
kafkaConfigVersion = version
}

// ParseKafkaVersion is a pass through to sarama.ParseKafkaVersion to get a KafkaVersion struct by a string version that can be passed into SetKafkaVersion
// This function is here so that calling code need not import sarama to set KafkaVersion
func ParseKafkaVersion(versionString string) (sarama.KafkaVersion, error) {
return sarama.ParseKafkaVersion(versionString)
}

func RegisterFlags() {
KafkaTLS = flag.Bool("kafka.tls", false, "Use TLS to connect to Kafka")
KafkaSASL = flag.Bool("kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)")
Expand All @@ -47,9 +64,15 @@ func RegisterFlags() {

KafkaHashing = flag.Bool("kafka.hashing", false, "Enable partitioning by hash instead of random")
KafkaKeying = flag.String("kafka.key", "SamplerAddr,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
KafkaVersion = flag.String("kafka.version", "0.11.0.0", "Log message version (must be a version that parses per sarama.ParseKafkaVersion)")
}

func StartKafkaProducerFromArgs(log utils.Logger) (*KafkaState, error) {
kVersion, err := ParseKafkaVersion(*KafkaVersion)
if err != nil {
return nil, err
}
SetKafkaVersion(kVersion)
addrs := make([]string, 0)
if *KafkaSrv != "" {
addrs, _ = utils.GetServiceAddresses(*KafkaSrv)
Expand All @@ -61,6 +84,7 @@ func StartKafkaProducerFromArgs(log utils.Logger) (*KafkaState, error) {

func StartKafkaProducer(addrs []string, topic string, hashing bool, keying string, useTls bool, useSasl bool, logErrors bool, log utils.Logger) (*KafkaState, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = kafkaConfigVersion
kafkaConfig.Producer.Return.Successes = false
kafkaConfig.Producer.Return.Errors = logErrors
if useTls {
Expand Down

0 comments on commit e620ed0

Please sign in to comment.