Skip to content

Commit

Permalink
Merge pull request #45 from xiangli-cmu/server
Browse files Browse the repository at this point in the history
add server state mutex to avoid state race condition
  • Loading branch information
benbjohnson committed Jul 6, 2013
2 parents 102c93b + 5f64d15 commit 72d2d09
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 46 deletions.
34 changes: 22 additions & 12 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package raft

import (
"errors"
"fmt"
"sync"
"time"
"fmt"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -185,37 +185,40 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
var resp *AppendEntriesResponse

select {
case <-time.After(p.server.heartbeatTimeout):
// how to decide?
case <-time.After(p.server.heartbeatTimeout * 2):
resp = nil

case resp = <-respChan:

}

debugln("receive flush response from ", p.Name())

if resp == nil {
debugln("receive flush timeout from ", p.Name())
return 0, false, fmt.Errorf("AppendEntries timeout: %s", p.Name())
}
debugln("receive flush response from ", p.Name())

// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
debugln("Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
}
debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
} else {

if resp.Term > p.server.currentTerm {
return resp.Term, false, errors.New("Step down")
}

// we may miss a response from peer
if resp.CommitIndex > p.prevLogIndex {
if resp.CommitIndex >= p.prevLogIndex {
debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
p.prevLogIndex = resp.CommitIndex
} else if p.prevLogIndex > 0 {
debugln("Peer ", p.Name(), "'s' step back to ", p.prevLogIndex)
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
Expand Down Expand Up @@ -257,13 +260,20 @@ func (p *Peer) heartbeat() {
} else {
// shutdown the heartbeat
if f.term > p.server.currentTerm {
debugln("[Heartbeat] SetpDown!")
select {
case p.server.stepDown <- f.term:
return
default:
return
p.server.stateMutex.Lock()

if p.server.state == Leader {
p.server.state = Follower
select {
case p.server.stepDown <- f.term:
p.server.currentTerm = f.term
default:
panic("heartbeat cannot step down")
}
}

p.server.stateMutex.Unlock()
return
}
}

Expand Down
121 changes: 99 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ type Server struct {
context interface{}
currentTerm uint64

votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
stateMutex sync.Mutex

electionTimer *Timer
heartbeatTimeout time.Duration
Expand Down Expand Up @@ -97,7 +98,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
state: Stopped,
peers: make(map[string]*Peer),
log: NewLog(),
stepDown: make(chan uint64),
stepDown: make(chan uint64, 1),
stop: make(chan bool),
electionTimer: NewTimer(DefaultElectionTimeout, DefaultElectionTimeout*2),
heartbeatTimeout: DefaultHeartbeatTimeout,
Expand Down Expand Up @@ -220,7 +221,7 @@ func (s *Server) LastCommandName() string {

// Get the state of the server for debugging
func (s *Server) GetState() string {
return fmt.Sprintf("State: %s, Term: %v, Index: %v ", s.state, s.currentTerm, s.CommittedIndex())
return fmt.Sprintf("Name: %s, State: %s, Term: %v, Index: %v ", s.name, s.state, s.currentTerm, s.CommittedIndex())
}

//--------------------------------------
Expand Down Expand Up @@ -351,6 +352,7 @@ func (s *Server) StartServerLoop(role string) {
role = Candidate

case Candidate:
debugln(s.GetState() + "start Candiate")
stop, leader = s.startCandidateLoop()

s.votedFor = ""
Expand All @@ -366,14 +368,17 @@ func (s *Server) StartServerLoop(role string) {

role = Follower
}
debugln(s.GetState() + "stop Candiate")

case Leader:
debugln(s.GetState() + "start Leader")
stop = s.startLeaderLoop()
if stop {
return
}

role = Follower
debugln(s.GetState() + "stop Leader")
}
}
}
Expand All @@ -387,13 +392,13 @@ func (s *Server) StartFollower() {
// Start the sever as a leader
func (s *Server) StartLeader() {
s.state = Candidate
s.currentTerm++
go s.StartServerLoop(Leader)
}

// Shuts down the server.
func (s *Server) Stop() {
s.mutex.Lock()

if s.state == Follower {
s.electionTimer.Stop()
} else {
Expand All @@ -413,8 +418,9 @@ func (s *Server) unload() {

// Close the log.
if s.log != nil {
// still some concurrency issue with stop
// need lock
s.log.Close()
s.log = nil
}

}
Expand Down Expand Up @@ -461,6 +467,7 @@ func (s *Server) startFollowerLoop() (stop bool) {

func (s *Server) startCandidateLoop() (stop bool, leader bool) {

// the server must be a follower
if s.state != Follower && s.state != Stopped {
panic("startCandidateLoop")
}
Expand Down Expand Up @@ -499,6 +506,7 @@ func (s *Server) startCandidateLoop() (stop bool, leader bool) {
}

if stop {

return true, false
}

Expand All @@ -523,17 +531,36 @@ func (s *Server) startCandidateLoop() (stop bool, leader bool) {
// Step down if currentTerm changes (§5.5)
func (s *Server) startLeaderLoop() bool {

if s.state != Candidate && s.state != Stopped {
panic(s.Name() + " promote to leader but not candidate " + s.state)
// when the server goes into this loop,
// the leader may have been stepped down to follower!

// we cannot assume the the server is a candidate when
// get into this func

// The request vote func may let it step down

// That happens when we receive the majority votes, but
// another candidate start a new term and has not vote for us
// after it send vote request, the leader will stepdown before
// it enter this func

// Move server to become a leader and begin peer heartbeats.
s.stateMutex.Lock()

if s.state == Candidate {
s.state = Leader
s.leader = s.name
} else {

s.stateMutex.Unlock()
return false
}

s.state = Leader
s.stateMutex.Unlock()

logIndex, _ := s.log.LastInfo()

// Move server to become a leader and begin peer heartbeats.
s.state = Leader
s.leader = s.name
// after here we let the leader stepdown in the startLeaderSelect loop

// Update the peers prevLogIndex to leader's lastLogIndex
// Start heartbeat
Expand Down Expand Up @@ -566,6 +593,9 @@ func (s *Server) startCandidateSelect(c chan *RequestVoteResponse) (bool, bool,
// Collect votes until we have a quorum.
votesGranted := 1

debugln(s.GetState() + "start Select")
defer debugln(s.GetState() + "end Select")

for {

// If we received enough votes then stop waiting for more votes.
Expand All @@ -576,26 +606,43 @@ func (s *Server) startCandidateSelect(c chan *RequestVoteResponse) (bool, bool,
// Collect votes from peers.
select {
case resp := <-c:
debugln(s.GetState() + "select recv vote")
if resp != nil {
if resp.VoteGranted == true {
votesGranted++

} else if resp.Term > s.currentTerm {
s.stateMutex.Lock()

// go from internal path
// we may need to eat the stepdown
select {
case <-s.stepDown:

default:

}

s.state = Follower
s.currentTerm = resp.Term
debugln(s.GetState() + "select step down")
s.stateMutex.Unlock()
return false, false, false
}
}

case term := <-s.stepDown:
s.currentTerm = term
case <-s.stepDown:
debugln(s.GetState() + "select step down")
return false, false, false

// TODO: do we calculate the overall timeout? or timeout for each vote?
// Some issue here
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
debugln(s.GetState() + "select timeout")
return false, true, false

case <-s.stop:
debugln(s.GetState() + "select stop")
return false, false, true
}

Expand All @@ -619,9 +666,9 @@ func (s *Server) startLeaderSelect() bool {
count++
}

case term := <-s.stepDown:
case <-s.stepDown:
// stepdown to follower
s.currentTerm = term

return false

case <-s.stop:
Expand Down Expand Up @@ -682,16 +729,34 @@ func (s *Server) startLeaderSelect() bool {
// when the command has been successfully committed or an error has occurred.

func (s *Server) Do(command Command) (interface{}, error) {
// race here
// chance to append entry when we are not leader
// after the check, the leader may stepdown
// but log appended with the newest term
// which means the command from this follower can be commited
// which will cause a panic

s.stateMutex.Lock()
if s.state != Leader {

s.stateMutex.Unlock()

return nil, NotLeaderError
}

entry := s.log.CreateEntry(s.currentTerm, command)
// we get the term of the server
// when we are sure the server is leader
term := s.currentTerm

s.stateMutex.Unlock()

entry := s.log.CreateEntry(term, command)

if err := s.log.AppendEntry(entry); err != nil {
return nil, err
}

s.response <- FlushResponse{s.currentTerm, true, nil, nil}
s.response <- FlushResponse{term, true, nil, nil}

// to speed up the response time
// TODO: think about this carefully
Expand Down Expand Up @@ -843,14 +908,25 @@ func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.votedFor = ""

s.stateMutex.Lock()
if s.state == Leader || s.state == Candidate {
debugln(s.Name(), " should step down to a follower from ", s.state)

s.stepDown <- term
s.state = Follower

select {
case s.stepDown <- term:

default:
panic("cannot stepdown")
}
debugln(s.Name(), " step down to a follower from ", s.state)
s.currentTerm = term
s.stateMutex.Unlock()
return
}

s.stateMutex.Unlock()
// update term after stop all the peer
s.currentTerm = term
}
Expand Down Expand Up @@ -879,6 +955,7 @@ func (s *Server) AddPeer(name string) error {
s.peers[peer.name] = peer

}

return nil
}

Expand Down
Loading

0 comments on commit 72d2d09

Please sign in to comment.