Skip to content

Commit

Permalink
Simplify locking for consumer info requests.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Dec 15, 2022
1 parent a681945 commit 7862984
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,10 +1122,7 @@ func (o *consumer) setLeader(isLeader bool) {
}

func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
o.mu.RLock()
sysc := o.sysc
o.mu.RUnlock()
sysc.sendInternalMsg(reply, _EMPTY_, nil, o.info())
o.infoWithSnapAndReply(false, reply)
}

// Lock should be held.
Expand Down Expand Up @@ -2202,23 +2199,27 @@ func (o *consumer) info() *ConsumerInfo {
}

func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
o.mu.RLock()
return o.infoWithSnapAndReply(snap, _EMPTY_)
}

func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
o.mu.Lock()
mset := o.mset
if mset == nil || mset.srv == nil {
o.mu.RUnlock()
o.mu.Unlock()
return nil
}
js := o.js
o.mu.RUnlock()

if js == nil {
o.mu.Unlock()
return nil
}

ci := js.clusterInfo(o.raftGroup())

o.mu.Lock()
defer o.mu.Unlock()
// Capture raftGroup.
var rg *raftGroup
if o.ca != nil {
rg = o.ca.Group
}

cfg := o.cfg
info := &ConsumerInfo{
Expand All @@ -2238,7 +2239,6 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
NumRedelivered: len(o.rdc),
NumPending: o.streamNumPending(),
PushBound: o.isPushMode() && o.active,
Cluster: ci,
}
// Adjust active based on non-zero etc. Also make UTC here.
if !o.ldt.IsZero() {
Expand All @@ -2259,6 +2259,18 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
if snap {
o.ici = info
}
sysc := o.sysc
o.mu.Unlock()

// Do cluster.
if rg != nil {
info.Cluster = js.clusterInfo(rg)
}

// If we have a reply subject send the response here.
if reply != _EMPTY_ && sysc != nil {
sysc.sendInternalMsg(reply, _EMPTY_, nil, info)
}

return info
}
Expand Down

0 comments on commit 7862984

Please sign in to comment.