From 656b1faaaff778cfbbde9cd3fc76f4ef03a0bd03 Mon Sep 17 00:00:00 2001 From: benclive Date: Mon, 10 Feb 2025 12:56:39 +0000 Subject: [PATCH] fix(dataobj): Fix shutdown race in dataobj consumer (#16157) --- pkg/dataobj/consumer/service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index 3f522e38a7c6c..1c36bf2057040 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -109,6 +109,10 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie func (s *Service) handlePartitionsRevoked(partitions map[string][]int32) { level.Info(s.logger).Log("msg", "partitions revoked", "partitions", formatPartitionsMap(partitions)) + if s.State() == services.Stopping { + // On shutdown, franz-go will send one more partitionRevoked event which we need to ignore to shutdown gracefully. + return + } s.partitionMtx.Lock() defer s.partitionMtx.Unlock()