diff --git a/server/raft.go b/server/raft.go index f041fb08987..f0be23aecf2 100644 --- a/server/raft.go +++ b/server/raft.go @@ -196,15 +196,14 @@ type raft struct { hcommit uint64 // The commit at the time that applies were paused pobserver bool // Whether we were an observer at the time that applies were paused - prop *ipQueue[*Entry] // Proposals - entry *ipQueue[*appendEntry] // Append entries - resp *ipQueue[*appendEntryResponse] // Append entries responses - apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer) - reqs *ipQueue[*voteRequest] // Vote requests - votes *ipQueue[*voteResponse] // Vote responses - stepdown *ipQueue[string] // Stepdown requests - leadc chan bool // Leader changes - quit chan struct{} // Raft group shutdown + prop *ipQueue[*Entry] // Proposals + entry *ipQueue[*appendEntry] // Append entries + resp *ipQueue[*appendEntryResponse] // Append entries responses + apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer) + reqs *ipQueue[*voteRequest] // Vote requests + votes *ipQueue[*voteResponse] // Vote responses + leadc chan bool // Leader changes + quit chan struct{} // Raft group shutdown } // cacthupState structure that holds our subscription, and catchup term and index @@ -388,7 +387,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"), resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"), apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"), - stepdown: newIPQueue[string](s, qpfx+"stepdown"), accName: accName, leadc: make(chan bool, 1), observer: cfg.Observer, @@ -863,7 +861,7 @@ func (n *raft) PauseApply() error { // If we are currently a candidate make sure we step down. if n.State() == Candidate { - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) } n.debug("Pausing our apply channel") @@ -1252,6 +1250,21 @@ func (n *raft) Leader() bool { return n.State() == Leader } +// stepdown immediately steps down the Raft node to the +// follower state. This will take the lock itself. +func (n *raft) stepdown(newLeader string) { + n.Lock() + defer n.Unlock() + n.stepdownLocked(newLeader) +} + +// stepdownLocked immediately steps down the Raft node to the +// follower state. This requires the lock is already held. +func (n *raft) stepdownLocked(newLeader string) { + n.debug("Stepping down") + n.switchToFollowerLocked(newLeader) +} + // isCatchingUp returns true if a catchup is currently taking place. func (n *raft) isCatchingUp() bool { n.RLock() @@ -1459,7 +1472,6 @@ func (n *raft) StepDown(preferred ...string) error { n.vote = noVote n.writeTermVote() - stepdown := n.stepdown prop := n.prop n.Unlock() @@ -1473,8 +1485,7 @@ func (n *raft) StepDown(preferred ...string) error { prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader))) } else { // Force us to stepdown here. - n.debug("Stepping down") - stepdown.push(noLeader) + n.stepdown(noLeader) } return nil @@ -1651,7 +1662,7 @@ func (n *raft) shutdown(shouldDelete bool) { // just will remove them from the central monitoring map queues := []interface { unregister() - }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown} + }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} for _, q := range queues { q.unregister() } @@ -1913,7 +1924,7 @@ func (n *raft) processAppendEntries() { // runAsFollower is called by run and will block for as long as the node is // running in the follower state. func (n *raft) runAsFollower() { - for { + for n.State() == Follower { elect := n.electTimer() select { @@ -1964,13 +1975,6 @@ func (n *raft) runAsFollower() { if voteReq, ok := n.reqs.popOne(); ok { n.processVoteRequest(voteReq) } - case <-n.stepdown.ch: - // We've received a stepdown request, start following the new leader if - // we can. - if newLeader, ok := n.stepdown.popOne(); ok { - n.switchToFollower(newLeader) - return - } } } } @@ -2307,14 +2311,14 @@ func (n *raft) runAsLeader() { fsub, err := n.subscribe(psubj, n.handleForwardedProposal) if err != nil { n.warn("Error subscribing to forwarded proposals: %v", err) - n.stepdown.push(noLeader) + n.stepdown(noLeader) return } rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal) if err != nil { n.warn("Error subscribing to forwarded remove peer proposals: %v", err) n.unsubscribe(fsub) - n.stepdown.push(noLeader) + n.stepdown(noLeader) return } @@ -2369,7 +2373,7 @@ func (n *raft) runAsLeader() { if b.Type == EntryLeaderTransfer { n.prop.recycle(&es) n.debug("Stepping down due to leadership transfer") - n.switchToFollower(noLeader) + n.stepdown(noLeader) return } // We need to re-create `entries` because there is a reference @@ -2384,7 +2388,7 @@ func (n *raft) runAsLeader() { } case <-lq.C: if n.lostQuorum() { - n.switchToFollower(noLeader) + n.stepdown(noLeader) return } case <-n.votes.ch: @@ -2394,7 +2398,7 @@ func (n *raft) runAsLeader() { continue } if vresp.term > n.Term() { - n.switchToFollower(noLeader) + n.stepdown(noLeader) return } n.trackPeer(vresp.peer) @@ -2403,11 +2407,6 @@ func (n *raft) runAsLeader() { if voteReq, ok := n.reqs.popOne(); ok { n.processVoteRequest(voteReq) } - case <-n.stepdown.ch: - if newLeader, ok := n.stepdown.popOne(); ok { - n.switchToFollower(newLeader) - return - } case <-n.entry.ch: n.processAppendEntries() } @@ -2576,7 +2575,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) { snap, err := n.loadLastSnapshot() if err != nil { // We need to stepdown here when this happens. - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) // We need to reset our state here as well. n.resetWAL() return 0, err @@ -2642,7 +2641,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err) if err == ErrStoreEOF { // If we are here we are seeing a request for an item beyond our state, meaning we should stepdown. - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.Unlock() arPool.Put(ar) return @@ -2654,7 +2653,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { // If we are here we are seeing a request for an item we do not have, meaning we should stepdown. // This is possible on a reset of our WAL but the other side has a snapshot already. // If we do not stepdown this can cycle. - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.Unlock() arPool.Put(ar) return @@ -2711,7 +2710,7 @@ func (n *raft) applyCommit(index uint64) error { if err != ErrStoreClosed && err != ErrStoreEOF { n.warn("Got an error loading %d index: %v - will reset", index, err) if n.State() == Leader { - n.stepdown.push(n.selectNextLeader()) + n.stepdownLocked(n.selectNextLeader()) } // Reset and cancel any catchup. n.resetWAL() @@ -2789,7 +2788,7 @@ func (n *raft) applyCommit(index uint64) error { // If this is us and we are the leader we should attempt to stepdown. if peer == n.id && n.State() == Leader { - n.stepdown.push(n.selectNextLeader()) + n.stepdown(n.selectNextLeader()) } // Remove from string intern map. @@ -2922,7 +2921,7 @@ func (n *raft) runAsCandidate() { // We vote for ourselves. votes := 1 - for { + for n.State() == Candidate { elect := n.electTimer() select { case <-n.entry.ch: @@ -2965,8 +2964,8 @@ func (n *raft) runAsCandidate() { n.term = vresp.term n.vote = noVote n.writeTermVote() - n.stepdown.push(noLeader) n.lxfer = false + n.stepdownLocked(noLeader) n.Unlock() } case <-n.reqs.ch: @@ -2974,11 +2973,6 @@ func (n *raft) runAsCandidate() { if voteReq, ok := n.reqs.popOne(); ok { n.processVoteRequest(voteReq) } - case <-n.stepdown.ch: - if newLeader, ok := n.stepdown.popOne(); ok { - n.switchToFollower(newLeader) - return - } } } } @@ -3132,7 +3126,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.vote = noVote n.writeTermVote() n.debug("Received append entry from another leader, stepping down to %q", ae.leader) - n.stepdown.push(ae.leader) + n.stepdownLocked(ae.leader) } else { // Let them know we are the leader. ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) @@ -3155,7 +3149,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.vote = noVote n.writeTermVote() } - n.stepdown.push(ae.leader) + n.stepdownLocked(ae.leader) } // Catching up state. @@ -3217,7 +3211,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } if n.State() != Follower { n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader) - n.stepdown.push(ae.leader) + n.stepdownLocked(ae.leader) } } @@ -3458,7 +3452,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.vote = noVote n.writeTermVote() n.warn("Detected another leader with higher term, will stepdown and reset") - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.resetWAL() n.Unlock() arPool.Put(ar) @@ -3506,7 +3500,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error { if index := ae.pindex + 1; index != seq { n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex) if n.State() == Leader { - n.stepdown.push(n.selectNextLeader()) + n.stepdownLocked(n.selectNextLeader()) } // Reset and cancel any catchup. n.resetWAL() @@ -3933,7 +3927,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { if n.State() != Follower { n.debug("Stepping down from %s, detected higher term: %d vs %d", strings.ToLower(n.State().String()), vr.term, n.term) - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.term = vr.term } n.vote = noVote @@ -4064,13 +4058,17 @@ const ( ) func (n *raft) switchToFollower(leader string) { + n.Lock() + defer n.Unlock() + + n.switchToFollowerLocked(leader) +} + +func (n *raft) switchToFollowerLocked(leader string) { if n.State() == Closed { return } - n.Lock() - defer n.Unlock() - n.debug("Switching to follower") n.lxfer = false