Skip to content

Commit

Permalink
Fix Kafka receiver closing ready channel multiple times (#1696)
Browse files Browse the repository at this point in the history
Avoid closing ready channel multiple times.

Fixes #1692
  • Loading branch information
pavolloffay authored Sep 2, 2020
1 parent 9d7c3a3 commit f3b5b45
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kafkareceiver
import (
"context"
"fmt"
"sync"

"github.com/Shopify/sarama"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -121,14 +122,17 @@ type consumerGroupHandler struct {
unmarshaller Unmarshaller
nextConsumer consumer.TraceConsumer
ready chan bool
readyCloser sync.Once

logger *zap.Logger
}

var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil)

func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
close(c.ready)
c.readyCloser.Do(func() {
close(c.ready)
})
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1))
return nil
Expand Down

0 comments on commit f3b5b45

Please sign in to comment.