Skip to content

Commit

Permalink
fix(dataobj): Fix shutdown race in dataobj consumer (#16157)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Feb 10, 2025
1 parent 82cfaea commit 656b1fa
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/dataobj/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie

func (s *Service) handlePartitionsRevoked(partitions map[string][]int32) {
level.Info(s.logger).Log("msg", "partitions revoked", "partitions", formatPartitionsMap(partitions))
if s.State() == services.Stopping {
// On shutdown, franz-go will send one more partitionRevoked event which we need to ignore to shutdown gracefully.
return
}
s.partitionMtx.Lock()
defer s.partitionMtx.Unlock()

Expand Down

0 comments on commit 656b1fa

Please sign in to comment.