Skip to content

Commit

Permalink
raft: port disruptive test
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Mar 10, 2018
1 parent ba7ddfd commit aac9869
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ pub struct Raft<T: Storage> {
heartbeat_elapsed: usize,

pub check_quorum: bool,
pre_vote: bool,
pub pre_vote: bool,
skip_bcast_commit: bool,

heartbeat_timeout: usize,
Expand Down Expand Up @@ -952,7 +952,16 @@ impl<T: Storage> Raft<T> {
// two features is to minimize the disruption caused by nodes that have been
// removed from the cluster's configuration: a removed node will send MsgVotes
// which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it
// will not create disruptive term increases
// will not create disruptive term increases, by notifying leader of this node's
// activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
let to_send = new_message(m.get_from(), MessageType::MsgAppendResponse, None);
self.send(to_send);
} else {
Expand Down
148 changes: 148 additions & 0 deletions tests/cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2260,6 +2260,154 @@ fn test_non_promotable_voter_which_check_quorum() {
assert_eq!(nt.peers[&2].leader_id, 1);
}

#[test]
// test_disruptive_follower tests isolated follower,
// with slow network incoming from leader, election times out
// to become a candidate with an increased term. Then, the
// candiate's response to late leader heartbeat forces the leader
// to step down.
fn test_disruptive_follower() {
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage());
let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage());

n1.check_quorum = true;
n2.check_quorum = true;
n3.check_quorum = true;

n1.become_follower(1, INVALID_ID);
n2.become_follower(1, INVALID_ID);
n3.become_follower(1, INVALID_ID);

let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Follower);

// etcd server "advanceTicksForElection" on restart;
// this is to expedite campaign trigger when given larger
// election timeouts (e.g. multi-datacenter deploy)
// Or leader messages are being delayed while ticks elapse
let timeout = nt.peers[&3].get_election_timeout();
nt.peers
.get_mut(&3)
.unwrap()
.set_randomized_election_timeout(timeout + 2);
let timeout = nt.peers[&3].get_randomized_election_timeout();
for _ in 0..timeout - 1 {
nt.peers.get_mut(&3).unwrap().tick();
}

// ideally, before last election tick elapses,
// the follower n3 receives "pb.MsgApp" or "pb.MsgHeartbeat"
// from leader n1, and then resets its "electionElapsed"
// however, last tick may elapse before receiving any
// messages from leader, thus triggering campaign
nt.peers.get_mut(&3).unwrap().tick();

// n1 is still leader yet
// while its heartbeat to candidate n3 is being delayed
// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);

// check term
// n1.Term == 2
// n2.Term == 2
// n3.Term == 3
assert_eq!(nt.peers[&1].term, 2);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 3);

// while outgoing vote requests are still queued in n3,
// leader heartbeat finally arrives at candidate n3
// however, due to delayed network from leader, leader
// heartbeat was sent with lower term than candidate's
let mut msg = new_message(1, 3, MessageType::MsgHeartbeat, 0);
msg.set_term(nt.peers[&1].term);
nt.send(vec![msg]);

// then candidate n3 responds with "pb.MsgAppResp" of higher term
// and leader steps down from a message with higher term
// this is to disrupt the current leader, so that candidate
// with higher term can be freed with following election

// check state
assert_eq!(nt.peers[&1].state, StateRole::Follower);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);

// check term
// n1.Term == 3
// n2.Term == 2
// n3.Term == 3
assert_eq!(nt.peers[&1].term, 3);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 3);
}

// TestDisruptiveFollowerPreVote tests isolated follower,
// with slow network incoming from leader, election times out
// to become a pre-candidate with less log than current leader.
// Then pre-vote phase prevents this isolated node from forcing
// current leader to step down, thus less disruptions.
#[test]
fn test_disruptive_follower_pre_vote() {
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage());
let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage());

n1.check_quorum = true;
n2.check_quorum = true;
n3.check_quorum = true;

n1.become_follower(1, INVALID_ID);
n2.become_follower(1, INVALID_ID);
n3.become_follower(1, INVALID_ID);

let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Follower);

nt.isolate(3);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);

nt.peers.get_mut(&1).unwrap().pre_vote = true;
nt.peers.get_mut(&2).unwrap().pre_vote = true;
nt.peers.get_mut(&3).unwrap().pre_vote = true;
nt.recover();
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::PreCandidate);

// check term
// n1.Term == 2
// n2.Term == 2
// n3.Term == 2
assert_eq!(nt.peers[&1].term, 2);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 2);

// delayed leader heartbeat does not force current leader to step down
let mut msg = new_message(1, 3, MessageType::MsgHeartbeat, 0);
msg.set_term(nt.peers[&1].term);
nt.send(vec![msg]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);
}

#[test]
fn test_read_only_option_safe() {
let a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
Expand Down

0 comments on commit aac9869

Please sign in to comment.