From 786298469cf001b3a59f66e8309ca9d4740aba05 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 14 Dec 2022 18:52:15 -0800 Subject: [PATCH] Simplify locking for consumer info requests. Signed-off-by: Derek Collison --- server/consumer.go | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2240bf2f2f..aceaf5904b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1122,10 +1122,7 @@ func (o *consumer) setLeader(isLeader bool) { } func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - o.mu.RLock() - sysc := o.sysc - o.mu.RUnlock() - sysc.sendInternalMsg(reply, _EMPTY_, nil, o.info()) + o.infoWithSnapAndReply(false, reply) } // Lock should be held. @@ -2202,23 +2199,27 @@ func (o *consumer) info() *ConsumerInfo { } func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { - o.mu.RLock() + return o.infoWithSnapAndReply(snap, _EMPTY_) +} + +func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { + o.mu.Lock() mset := o.mset if mset == nil || mset.srv == nil { - o.mu.RUnlock() + o.mu.Unlock() return nil } js := o.js - o.mu.RUnlock() - if js == nil { + o.mu.Unlock() return nil } - ci := js.clusterInfo(o.raftGroup()) - - o.mu.Lock() - defer o.mu.Unlock() + // Capture raftGroup. + var rg *raftGroup + if o.ca != nil { + rg = o.ca.Group + } cfg := o.cfg info := &ConsumerInfo{ @@ -2238,7 +2239,6 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { NumRedelivered: len(o.rdc), NumPending: o.streamNumPending(), PushBound: o.isPushMode() && o.active, - Cluster: ci, } // Adjust active based on non-zero etc. Also make UTC here. if !o.ldt.IsZero() { @@ -2259,6 +2259,18 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { if snap { o.ici = info } + sysc := o.sysc + o.mu.Unlock() + + // Do cluster. + if rg != nil { + info.Cluster = js.clusterInfo(rg) + } + + // If we have a reply subject send the response here. + if reply != _EMPTY_ && sysc != nil { + sysc.sendInternalMsg(reply, _EMPTY_, nil, info) + } return info }