-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend Kafka output plugin (add LZ4 compression and batch sending) #4210
Conversation
…utput plugin (add LZ4 commpression + batch sending)
@danielnelson Hi! |
@@ -181,7 +181,7 @@ func UTF16ToStringArray(buf []uint16) []string { | |||
stringLine := UTF16PtrToString(&buf[0]) | |||
for stringLine != "" { | |||
strings = append(strings, stringLine) | |||
nextLineStart += len(stringLine) + 1 | |||
nextLineStart += len([]rune(stringLine)) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you split this change out into a separate PR and also add a unittest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, no problem. Little bit later
plugins/outputs/kafka/kafka.go
Outdated
## Each data format has its own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
# data_format = "influx" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert back to the 2 space indention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if err != nil { | ||
return err | ||
} | ||
config.Version = kafkaVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required for this patch to work, what happens if it is removed? The documentation seems to indicate that this is used to control some features, what is improved by adding it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not set this parameter sarama lib return error if we switch to LZ4 compression.
https://github.com/Shopify/sarama/blob/70f6a705d4a17af059acbc6946fb2bd30762acd7/config_test.go#L199
https://github.com/Shopify/sarama/blob/44e7121d3b5189096ae4ef90c442f5f806c10fc9/config.go#L263
https://github.com/Shopify/sarama/blob/44e7121d3b5189096ae4ef90c442f5f806c10fc9/config.go#L416
In some places of sarama the 'Version' parameter used to switch to "new" Kafka API that supposed to be more perfomance than "old" one.
Done, but CI fail |
closes #4161 |
Looks like a bug in the syslog tests, I'll rerun the tests. |
@@ -186,6 +202,11 @@ func (k *Kafka) Connect() error { | |||
} | |||
config := sarama.NewConfig() | |||
|
|||
kafkaVersion, err := sarama.ParseKafkaVersion(k.Version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If version is set to an empty string I believe this will panic.
buf, err := k.serializer.Serialize(metric) | ||
if err != nil { | ||
return err | ||
routingTag = metric.Tags()[k.RoutingTag] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So long as you are in this code, can you change this to use metric.GetTag(k.RoutingTag)
, which doesn't allocate.
} | ||
routingTags[routingTag] = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove the routingTags
map, and just base the creation of the submap on if _, found := routingTags[routingTag]; !found
is true.
var ( | ||
metricsmap = map[string]map[string][]telegraf.Metric{} | ||
routingTags = map[string]bool{} | ||
routingTag string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't declare routingTag
here, just create it on 270.
@korservick Would you be able to sign the CLA too? |
Guys, one thing about #4161 is that it allows a configurable batch size. While allowing the producer to flush the entire buffer per flush will be fine on most cases, there are certain extremely high volume use cases for which we may want to have a separate batch size parameter associated with this feature. In kafka it's necessary to set a max message size at the broker level. This isn't really a "message size" but rather like a batch size. So if my line protocols are 100 bytes and I send a batch size of 10 (that is 1000 bytes) for that package (let's assume no compression for now. Now if I am not allowed to control the batch size, or set it's absolute ceiling I will run into issues with setting the max message size. |
There is a several flush Config options in sarama that seem useful, such as |
@danielnelson Each kafka topic has a match message bytes config. Let's say it's set at 2MEG. one thing we'll want to make sure is that we have the granularity of control to ensure our system isn't bumping up agains tthat with batching. Is Sarama coded to respect that parameter? Just curious. |
This option defaults to 1MB and currently we don't expose it as a configurable setting. With the current method in master where each message currently contains a single metric, it is unlikely we would bump up against this, but we could allow it to be configured. If we do end up adding batching then we definitely need to expose this setting. |
Fix crush on non-English name of metrics on Windows (win_perf_counters input plugin) + extend kafka output plugin (add LZ4 commpression + batch sending)
Required for all PRs: