Skip to content

Commit

Permalink
Merge pull request cloudflare#47 from dwayn/master
Browse files Browse the repository at this point in the history
Added the ability to configure the log message format for kafka
  • Loading branch information
lspgn authored Dec 12, 2019
2 parents 610850e + e620ed0 commit 2a23ecc
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions transport/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"strings"

flowmessage "github.com/cloudflare/goflow/pb"
"github.com/cloudflare/goflow/utils"
proto "github.com/golang/protobuf/proto"

//"github.com/golang/protobuf/descriptor"
"errors"
"flag"
sarama "github.com/Shopify/sarama"
"os"
"reflect"
"strings"

sarama "github.com/Shopify/sarama"
)

var (
Expand All @@ -27,6 +30,9 @@ var (

KafkaHashing *bool
KafkaKeying *string
KafkaVersion *string

kafkaConfigVersion sarama.KafkaVersion = sarama.V0_11_0_0
)

type KafkaState struct {
Expand All @@ -36,6 +42,17 @@ type KafkaState struct {
keying []string
}

// SetKafkaVersion sets the KafkaVersion that is used to set the log message format version
func SetKafkaVersion(version sarama.KafkaVersion) {
kafkaConfigVersion = version
}

// ParseKafkaVersion is a pass through to sarama.ParseKafkaVersion to get a KafkaVersion struct by a string version that can be passed into SetKafkaVersion
// This function is here so that calling code need not import sarama to set KafkaVersion
func ParseKafkaVersion(versionString string) (sarama.KafkaVersion, error) {
return sarama.ParseKafkaVersion(versionString)
}

func RegisterFlags() {
KafkaTLS = flag.Bool("kafka.tls", false, "Use TLS to connect to Kafka")
KafkaSASL = flag.Bool("kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)")
Expand All @@ -47,9 +64,15 @@ func RegisterFlags() {

KafkaHashing = flag.Bool("kafka.hashing", false, "Enable partitioning by hash instead of random")
KafkaKeying = flag.String("kafka.key", "SamplerAddr,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
KafkaVersion = flag.String("kafka.version", "0.11.0.0", "Log message version (must be a version that parses per sarama.ParseKafkaVersion)")
}

func StartKafkaProducerFromArgs(log utils.Logger) (*KafkaState, error) {
kVersion, err := ParseKafkaVersion(*KafkaVersion)
if err != nil {
return nil, err
}
SetKafkaVersion(kVersion)
addrs := make([]string, 0)
if *KafkaSrv != "" {
addrs, _ = utils.GetServiceAddresses(*KafkaSrv)
Expand All @@ -61,6 +84,7 @@ func StartKafkaProducerFromArgs(log utils.Logger) (*KafkaState, error) {

func StartKafkaProducer(addrs []string, topic string, hashing bool, keying string, useTls bool, useSasl bool, logErrors bool, log utils.Logger) (*KafkaState, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = kafkaConfigVersion
kafkaConfig.Producer.Return.Successes = false
kafkaConfig.Producer.Return.Errors = logErrors
if useTls {
Expand Down

0 comments on commit 2a23ecc

Please sign in to comment.