Skip to content

Commit

Permalink
Fixes output.kafka bulks sizes (#12254)
Browse files Browse the repository at this point in the history
Set sarama `Producer.Flush` params based on `bulk_max_size` and
`bulk_max_frequency` config options to control the bulk size when
Kafka output is used.
  • Loading branch information
marqc authored and jsoriano committed Jun 10, 2019
1 parent 75e7c05 commit ff6c007
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixed a memory leak when using the add_process_metadata processor under Windows. {pull}12100[12100]
- Fix of docker json parser for missing "log" jsonkey in docker container's log {issue}11464[11464]
- Fixed Beat ID being reported by GET / API. {pull}12180[12180]
- Fixed setting bulk max size in kafka output. {pull}12254[12254]
- Add host.os.codename to fields.yml. {pull}12261[12261]
- Fix `@timestamp` being duplicated in events if `@timestamp` is set in a
processor (or by any code utilizing `PutValue()` on a `beat.Event`).
Expand Down Expand Up @@ -184,6 +185,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add number of goroutines to reported metrics. {pull}12135[12135]
- Add `proxy_disable` output flag to explicitly ignore proxy environment variables. {issue}11713[11713] {pull}12243[12243]
- Processor `add_cloud_metadata` adds fields `cloud.account.id` and `cloud.image.id` for AWS EC2. {pull}12307[12307]
- Add configurable bulk_flush_frequency in kafka output. {pull}12254[12254]

*Auditbeat*

Expand Down
4 changes: 4 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,10 @@ endif::[]

The maximum number of events to bulk in a single Kafka request. The default is 2048.

===== `bulk_flush_frequency`

Duration to wait before sending bulk Kafka request. 0 is no delay. The default is 0.

===== `timeout`

The number of seconds to wait for responses from the Kafka brokers before timing
Expand Down
56 changes: 32 additions & 24 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,27 @@ import (
)

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Partition map[string]*common.Config `config:"partition"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Partition map[string]*common.Config `config:"partition"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
}

type metaConfig struct {
Expand All @@ -81,10 +82,11 @@ var compressionModes = map[string]sarama.CompressionCodec{

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
BulkMaxSize: 2048,
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
BulkMaxSize: 2048,
BulkFlushFrequency: 0,
Metadata: metaConfig{
Retry: metaRetryConfig{
Max: 3,
Expand Down Expand Up @@ -209,6 +211,12 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize

// configure bulk size
k.Producer.Flush.MaxMessages = config.BulkMaxSize
if config.BulkFlushFrequency > 0 {
k.Producer.Flush.Frequency = config.BulkFlushFrequency
}

// configure client ID
k.ClientID = config.ClientID

Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions x-pack/auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down
4 changes: 4 additions & 0 deletions x-pack/winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ output.elasticsearch:
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
Expand Down

0 comments on commit ff6c007

Please sign in to comment.