Skip to content

Commit

Permalink
NRG: Remove stepdown channel, handle inline
Browse files Browse the repository at this point in the history
The stepdown channel interleaves with other channels such as the apply queue,
leader change notifications etc in the `runAs` goroutines in an unpredictable
order, so processing a stepdown request might be delayed behind other work.
Doing this inline should be safer with stronger guarantees.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored and wallyqs committed Feb 14, 2024
1 parent 89b686d commit fb6640a
Showing 1 changed file with 52 additions and 54 deletions.
106 changes: 52 additions & 54 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,14 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
stepdown *ipQueue[string] // Stepdown requests
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
}

// cacthupState structure that holds our subscription, and catchup term and index
Expand Down Expand Up @@ -388,7 +387,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
stepdown: newIPQueue[string](s, qpfx+"stepdown"),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
Expand Down Expand Up @@ -863,7 +861,7 @@ func (n *raft) PauseApply() error {

// If we are currently a candidate make sure we step down.
if n.State() == Candidate {
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
}

n.debug("Pausing our apply channel")
Expand Down Expand Up @@ -1252,6 +1250,21 @@ func (n *raft) Leader() bool {
return n.State() == Leader
}

// stepdown immediately steps down the Raft node to the
// follower state. This will take the lock itself.
func (n *raft) stepdown(newLeader string) {
n.Lock()
defer n.Unlock()
n.stepdownLocked(newLeader)
}

// stepdownLocked immediately steps down the Raft node to the
// follower state. This requires the lock is already held.
func (n *raft) stepdownLocked(newLeader string) {
n.debug("Stepping down")
n.switchToFollowerLocked(newLeader)
}

// isCatchingUp returns true if a catchup is currently taking place.
func (n *raft) isCatchingUp() bool {
n.RLock()
Expand Down Expand Up @@ -1459,7 +1472,6 @@ func (n *raft) StepDown(preferred ...string) error {
n.vote = noVote
n.writeTermVote()

stepdown := n.stepdown
prop := n.prop
n.Unlock()

Expand All @@ -1473,8 +1485,7 @@ func (n *raft) StepDown(preferred ...string) error {
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
} else {
// Force us to stepdown here.
n.debug("Stepping down")
stepdown.push(noLeader)
n.stepdown(noLeader)
}

return nil
Expand Down Expand Up @@ -1651,7 +1662,7 @@ func (n *raft) shutdown(shouldDelete bool) {
// just will remove them from the central monitoring map
queues := []interface {
unregister()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown}
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.unregister()
}
Expand Down Expand Up @@ -1913,7 +1924,7 @@ func (n *raft) processAppendEntries() {
// runAsFollower is called by run and will block for as long as the node is
// running in the follower state.
func (n *raft) runAsFollower() {
for {
for n.State() == Follower {
elect := n.electTimer()

select {
Expand Down Expand Up @@ -1964,13 +1975,6 @@ func (n *raft) runAsFollower() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
// We've received a stepdown request, start following the new leader if
// we can.
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -2307,14 +2311,14 @@ func (n *raft) runAsLeader() {
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}

Expand Down Expand Up @@ -2369,7 +2373,7 @@ func (n *raft) runAsLeader() {
if b.Type == EntryLeaderTransfer {
n.prop.recycle(&es)
n.debug("Stepping down due to leadership transfer")
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
// We need to re-create `entries` because there is a reference
Expand All @@ -2384,7 +2388,7 @@ func (n *raft) runAsLeader() {
}
case <-lq.C:
if n.lostQuorum() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
case <-n.votes.ch:
Expand All @@ -2394,7 +2398,7 @@ func (n *raft) runAsLeader() {
continue
}
if vresp.term > n.Term() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
Expand All @@ -2403,11 +2407,6 @@ func (n *raft) runAsLeader() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
case <-n.entry.ch:
n.processAppendEntries()
}
Expand Down Expand Up @@ -2576,7 +2575,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
snap, err := n.loadLastSnapshot()
if err != nil {
// We need to stepdown here when this happens.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
// We need to reset our state here as well.
n.resetWAL()
return 0, err
Expand Down Expand Up @@ -2642,7 +2641,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err)
if err == ErrStoreEOF {
// If we are here we are seeing a request for an item beyond our state, meaning we should stepdown.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand All @@ -2654,7 +2653,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
// If we are here we are seeing a request for an item we do not have, meaning we should stepdown.
// This is possible on a reset of our WAL but the other side has a snapshot already.
// If we do not stepdown this can cycle.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand Down Expand Up @@ -2711,7 +2710,7 @@ func (n *raft) applyCommit(index uint64) error {
if err != ErrStoreClosed && err != ErrStoreEOF {
n.warn("Got an error loading %d index: %v - will reset", index, err)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -2789,7 +2788,7 @@ func (n *raft) applyCommit(index uint64) error {

// If this is us and we are the leader we should attempt to stepdown.
if peer == n.id && n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdown(n.selectNextLeader())
}

// Remove from string intern map.
Expand Down Expand Up @@ -2922,7 +2921,7 @@ func (n *raft) runAsCandidate() {
// We vote for ourselves.
votes := 1

for {
for n.State() == Candidate {
elect := n.electTimer()
select {
case <-n.entry.ch:
Expand Down Expand Up @@ -2965,20 +2964,15 @@ func (n *raft) runAsCandidate() {
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.stepdown.push(noLeader)
n.lxfer = false
n.stepdownLocked(noLeader)
n.Unlock()
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -3132,7 +3126,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.vote = noVote
n.writeTermVote()
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
} else {
// Let them know we are the leader.
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
Expand All @@ -3155,7 +3149,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.vote = noVote
n.writeTermVote()
}
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}

// Catching up state.
Expand Down Expand Up @@ -3217,7 +3211,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3458,7 +3452,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.vote = noVote
n.writeTermVote()
n.warn("Detected another leader with higher term, will stepdown and reset")
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.resetWAL()
n.Unlock()
arPool.Put(ar)
Expand Down Expand Up @@ -3506,7 +3500,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
if index := ae.pindex + 1; index != seq {
n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -3933,7 +3927,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
if n.State() != Follower {
n.debug("Stepping down from %s, detected higher term: %d vs %d",
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.term = vr.term
}
n.vote = noVote
Expand Down Expand Up @@ -4064,13 +4058,17 @@ const (
)

func (n *raft) switchToFollower(leader string) {
n.Lock()
defer n.Unlock()

n.switchToFollowerLocked(leader)
}

func (n *raft) switchToFollowerLocked(leader string) {
if n.State() == Closed {
return
}

n.Lock()
defer n.Unlock()

n.debug("Switching to follower")

n.lxfer = false
Expand Down

0 comments on commit fb6640a

Please sign in to comment.