diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 6ea35d7b132..9ec72841bd5 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -17,6 +17,7 @@ package kafkareceiver import ( "context" "fmt" + "sync" "github.com/Shopify/sarama" "go.opencensus.io/stats" @@ -121,6 +122,7 @@ type consumerGroupHandler struct { unmarshaller Unmarshaller nextConsumer consumer.TraceConsumer ready chan bool + readyCloser sync.Once logger *zap.Logger } @@ -128,7 +130,9 @@ type consumerGroupHandler struct { var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil) func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { - close(c.ready) + c.readyCloser.Do(func() { + close(c.ready) + }) statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)} _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1)) return nil