Skip to content

Commit

Permalink
use exponential backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
yiquanzhou committed Jan 2, 2025
1 parent 494fce9 commit 3804d00
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 6 deletions.
3 changes: 2 additions & 1 deletion receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
Expand Down Expand Up @@ -87,7 +88,7 @@ type Config struct {
MaxFetchSize int32 `mapstructure:"max_fetch_size"`

// In case of some errors returned by the next consumer, the receiver will wait before consuming the next message
ErrorBackoff time.Duration `mapstructure:"error_backoff"`
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
}

const (
Expand Down
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"

Expand Down Expand Up @@ -65,6 +66,9 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
},
},
{
Expand Down Expand Up @@ -101,6 +105,13 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 1 * time.Minute,
Multiplier: 1.5,
},
},
},
}
Expand Down
21 changes: 18 additions & 3 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -207,7 +208,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: backoff.NewConstantBackOff(c.config.ErrorBackoff),
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand All @@ -221,6 +222,20 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
return nil
}

func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
if !config.Enabled {
return nil
}
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = config.InitialInterval
backOff.RandomizationFactor = config.RandomizationFactor
backOff.Multiplier = config.Multiplier
backOff.MaxInterval = config.MaxInterval
backOff.MaxElapsedTime = config.MaxElapsedTime
backOff.Reset()
return backOff
}

func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
defer c.consumeLoopWG.Done()
for {
Expand Down Expand Up @@ -484,7 +499,7 @@ type tracesConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff backoff.BackOff
backOff *backoff.ExponentialBackOff
}

type metricsConsumerGroupHandler struct {
Expand Down Expand Up @@ -587,7 +602,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
if errorRequiresBackoff(err) {
if errorRequiresBackoff(err) && c.backOff != nil {
select {
case <-session.Context().Done():
return nil
Expand Down
6 changes: 4 additions & 2 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
{
name: "memory limiter data refused error",
expectedError: memoryLimiterError,
expectedBackoff: 1 * time.Second,
expectedBackoff: backoff.DefaultInitialInterval,
},
{
name: "other consumer error",
Expand All @@ -364,6 +364,8 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
backOff := backoff.NewExponentialBackOff()
backOff.RandomizationFactor = 0
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
Expand All @@ -372,7 +374,7 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: nopTelemetryBuilder(t),
backOff: backoff.NewConstantBackOff(1 * time.Second),
backOff: backOff,
}

wg := sync.WaitGroup{}
Expand Down
6 changes: 6 additions & 0 deletions receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ kafka/logs:
retry:
max: 10
backoff: 5s
error_backoff:
enabled: true
initial_interval: 1s
max_interval: 10s
max_elapsed_time: 1m
multiplier: 1.5

0 comments on commit 3804d00

Please sign in to comment.