Skip to content

Commit

Permalink
feat: add max_processing_time config to Kafka Consumer input (#9988)
Browse files Browse the repository at this point in the history
  • Loading branch information
cruscio authored Oct 28, 2021
1 parent 96c66d3 commit 343e846
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
9 changes: 9 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ and use the old zookeeper connection method.
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Maximum amount of time the consumer should take to process messages. If
## the debug log prints messages from sarama about 'abandoning subscription
## to [topic] because consuming was taking too long', increase this value to
## longer than the time taken by the output plugin(s).
##
## Note that the effective timeout could be between 'max_processing_time' and
## '2 * max_processing_time'.
# max_processing_time = "100ms"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
33 changes: 25 additions & 8 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Shopify/sarama"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -101,6 +102,15 @@ const sampleConfig = `
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Maximum amount of time the consumer should take to process messages. If
## the debug log prints messages from sarama about 'abandoning subscription
## to [topic] because consuming was taking too long', increase this value to
## longer than the time taken by the output plugin(s).
##
## Note that the effective timeout could be between 'max_processing_time' and
## '2 * max_processing_time'.
# max_processing_time = "100ms"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand All @@ -110,6 +120,7 @@ const sampleConfig = `

const (
defaultMaxUndeliveredMessages = 1000
defaultMaxProcessingTime = config.Duration(100 * time.Millisecond)
defaultConsumerGroup = "telegraf_metrics_consumers"
reconnectDelay = 5 * time.Second
)
Expand All @@ -118,14 +129,15 @@ type empty struct{}
type semaphore chan empty

type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
Brokers []string `toml:"brokers"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MaxProcessingTime config.Duration `toml:"max_processing_time"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`

kafka.ReadConfig

Expand Down Expand Up @@ -172,6 +184,9 @@ func (k *KafkaConsumer) Init() error {
if k.MaxUndeliveredMessages == 0 {
k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
}
if time.Duration(k.MaxProcessingTime) == 0 {
k.MaxProcessingTime = defaultMaxProcessingTime
}
if k.ConsumerGroup == "" {
k.ConsumerGroup = defaultConsumerGroup
}
Expand Down Expand Up @@ -209,6 +224,8 @@ func (k *KafkaConsumer) Init() error {
k.ConsumerCreator = &SaramaCreator{}
}

config.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime)

k.config = config
return nil
}
Expand Down
12 changes: 12 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/parsers/value"
Expand Down Expand Up @@ -64,6 +65,7 @@ func TestInit(t *testing.T) {
require.Equal(t, plugin.MaxUndeliveredMessages, defaultMaxUndeliveredMessages)
require.Equal(t, plugin.config.ClientID, "Telegraf")
require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetOldest)
require.Equal(t, plugin.config.Consumer.MaxProcessingTime, 100*time.Millisecond)
},
},
{
Expand Down Expand Up @@ -165,6 +167,16 @@ func TestInit(t *testing.T) {
require.True(t, plugin.config.Net.TLS.Enable)
},
},
{
name: "custom max_processing_time",
plugin: &KafkaConsumer{
MaxProcessingTime: config.Duration(1000 * time.Millisecond),
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.Consumer.MaxProcessingTime, 1000*time.Millisecond)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 343e846

Please sign in to comment.