Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Kafka output plugin (add LZ4 compression and batch sending) #4210

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
### Configuration:
```toml
[[outputs.kafka]]
## The version of Kafka that Telegraf (Sarama lib) will assume it is running against.
## Defaults to the oldest supported stable version. Since Kafka provides
## backwards-compatibility, setting it to a version older than you have
## will not break anything, although it may prevent you from using the
## latest features. Setting it to a version greater than you are actually
## running may lead to random breakage.
# version = "0.8.2.0"
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
Expand Down Expand Up @@ -46,6 +53,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression (you need use Kafka greater than 0.10.0.0 and set 'version' parameter in this config)
# compression_codec = 0

## RequiredAcks is used in Produce Requests to tell the broker how many
Expand All @@ -68,6 +76,10 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## until the next flush.
# max_retry = 3

## When true, metrics will be sent in one message per flush. Otherwise,
## metrics are written one metric per message.
# batch = false

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand Down
87 changes: 74 additions & 13 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var ValidTopicSuffixMethods = []string{

type (
Kafka struct {
// Kafka version
Version string
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Expand All @@ -45,6 +47,8 @@ type (
CA string

tlsint.ClientConfig
// Batch messages (if output format support it)
BatchMessage bool `toml:"batch"`

// SASL Username
SASLUsername string `toml:"sasl_username"`
Expand All @@ -64,6 +68,13 @@ type (
)

var sampleConfig = `
## The version of Kafka that Telegraf (Sarama lib) will assume it is running against.
## Defaults to the oldest supported stable version. Since Kafka provides
## backwards-compatibility, setting it to a version older than you have
## will not break anything, although it may prevent you from using the
## latest features. Setting it to a version greater than you are actually
## running may lead to random breakage.
# version = "0.8.2.0"
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
Expand All @@ -76,7 +87,7 @@ var sampleConfig = `
## tags - suffix equals to separator + specified tags' values
## interleaved with separator

## Suffix equals to "_" + measurement name
## Suffix equals to "_" + measurement's name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"
Expand Down Expand Up @@ -105,6 +116,7 @@ var sampleConfig = `
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
# compression_codec = 0

## RequiredAcks is used in Produce Requests to tell the broker how many
Expand All @@ -127,6 +139,10 @@ var sampleConfig = `
## until the next flush.
# max_retry = 3

## When true, metrics will be sent in one message per flush. Otherwise,
## metrics are written one metric per message.
# batch = false

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand Down Expand Up @@ -186,6 +202,11 @@ func (k *Kafka) Connect() error {
}
config := sarama.NewConfig()

kafkaVersion, err := sarama.ParseKafkaVersion(k.Version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If version is set to an empty string I believe this will panic.

if err != nil {
return err
}
config.Version = kafkaVersion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required for this patch to work, what happens if it is removed? The documentation seems to indicate that this is used to control some features, what is improved by adding it?

Copy link
Author

@korservick korservick May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not set this parameter sarama lib return error if we switch to LZ4 compression.
https://github.com/Shopify/sarama/blob/70f6a705d4a17af059acbc6946fb2bd30762acd7/config_test.go#L199
https://github.com/Shopify/sarama/blob/44e7121d3b5189096ae4ef90c442f5f806c10fc9/config.go#L263
https://github.com/Shopify/sarama/blob/44e7121d3b5189096ae4ef90c442f5f806c10fc9/config.go#L416

In some places of sarama the 'Version' parameter used to switch to "new" Kafka API that supposed to be more perfomance than "old" one.

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
config.Producer.Retry.Max = k.MaxRetry
Expand Down Expand Up @@ -235,30 +256,69 @@ func (k *Kafka) Description() string {
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
var (
metricsmap = map[string]map[string][]telegraf.Metric{}
routingTags = map[string]bool{}
routingTag string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't declare routingTag here, just create it on 270.

)

if len(metrics) == 0 {
return nil
}

for _, metric := range metrics {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
routingTag = metric.Tags()[k.RoutingTag]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So long as you are in this code, can you change this to use metric.GetTag(k.RoutingTag), which doesn't allocate.

if _, found := routingTags[routingTag]; !found {
metricsmap[routingTag] = make(map[string][]telegraf.Metric)
}
routingTags[routingTag] = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove the routingTags map, and just base the creation of the submap on if _, found := routingTags[routingTag]; !found is true.


topicName := k.GetTopicName(metric)

m := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {
m.Key = sarama.StringEncoder(h)
if k.BatchMessage {
metricsmap[routingTag][topicName] = append(metricsmap[routingTag][topicName], metric)
} else {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
}

m := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if routingTag != "" {
m.Key = sarama.StringEncoder(routingTag)
}

_, _, err = k.producer.SendMessage(m)

if err != nil {
return fmt.Errorf("FAILED to send kafka message: %s", err)
}
}
}

for routingTag := range metricsmap {
for topicName := range metricsmap[routingTag] {
buf, err := k.serializer.SerializeBatch(metricsmap[routingTag][topicName])

_, _, err = k.producer.SendMessage(m)
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if routingTag != "" {
m.Key = sarama.StringEncoder(routingTag)
}

if err != nil {
return fmt.Errorf("FAILED to send kafka message: %s\n", err)
_, _, err = k.producer.SendMessage(m)

if err != nil {
return fmt.Errorf("FAILED to send kafka message: %s", err)
}
}
}
return nil
Expand All @@ -267,6 +327,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
func init() {
outputs.Add("kafka", func() telegraf.Output {
return &Kafka{
Version: "0.8.2.0",
MaxRetry: 3,
RequiredAcks: -1,
}
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestConnectAndWrite(t *testing.T) {
brokers := []string{testutil.GetLocalHost() + ":9092"}
s, _ := serializers.NewInfluxSerializer()
k := &Kafka{
Version: "0.8.2.0",
Brokers: brokers,
Topic: "Test",
serializer: s,
Expand Down