Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Avoid scanning raft log in become_leader #15

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,13 @@ pub struct Raft<T: Storage> {
/// Follow the procedure defined in raft thesis 3.10.
pub lead_transferee: Option<u64>,

/// New configuration is ignored if there exists unapplied configuration.
pub pending_conf: bool,
/// Only one conf change may be pending (in the log, but not yet
/// applied) at a time. This is enforced via pending_conf_index, which
/// is set to a value >= the log index of the latest pending
/// configuration change (if any). Config changes are only allowed to
/// be proposed if the leader's applied index is greater than this
/// value.
pub pending_conf_index: u64,

pub read_only: ReadOnly,

Expand Down Expand Up @@ -311,7 +316,7 @@ impl<T: Storage> Raft<T> {
lead_transferee: None,
term: Default::default(),
election_elapsed: Default::default(),
pending_conf: Default::default(),
pending_conf_index: Default::default(),
before_step_state: None,
vote: Default::default(),
heartbeat_elapsed: Default::default(),
Expand Down Expand Up @@ -639,7 +644,7 @@ impl<T: Storage> Raft<T> {

self.votes = FlatMap::default();

self.pending_conf = false;
self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);

let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
Expand Down Expand Up @@ -780,14 +785,13 @@ impl<T: Storage> Raft<T> {
let ents = self.raft_log
.entries(begin, raft_log::NO_LIMIT)
.expect("unexpected error getting uncommitted entries");
let nconf = self.num_pending_conf(&ents);
if nconf > 1 {
panic!("{} unexpected double uncommitted config entry", self.tag);
}

if nconf == 1 {
self.pending_conf = true;
}
// Conservatively set the pending_conf_index to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
self.pending_conf_index = ents.last().map_or(0, |e| e.get_index());

self.append_entry(&mut [Entry::new()]);
info!("{} became leader at term {}", self.tag, self.term);
}
Expand Down Expand Up @@ -1387,18 +1391,20 @@ impl<T: Storage> Raft<T> {
return;
}

for e in m.mut_entries().iter_mut() {
if e.get_entry_type() == EntryType::EntryConfChange {
if self.pending_conf {
for (i, e) in m.mut_entries().iter_mut().enumerate() {

if e.get_entry_type() == EntryType::EntryConfChange {
if self.pending_conf_index > self.raft_log.applied {
info!(
"propose conf {:?} ignored since pending unapplied \
configuration",
e
configuration [index {}, applied {}]",
e, self.pending_conf_index, self.raft_log.applied
);
*e = Entry::new();
e.set_entry_type(EntryType::EntryNormal);
} else {
self.pending_conf_index = self.raft_log.last_index() + i as u64 + 1;
}
self.pending_conf = true;
}
}
self.append_entry(&mut m.mut_entries());
Expand Down Expand Up @@ -1819,7 +1825,7 @@ impl<T: Storage> Raft<T> {
}

pub fn should_bcast_commit(&self) -> bool {
!self.skip_bcast_commit || self.pending_conf
!self.skip_bcast_commit || (self.pending_conf_index > self.raft_log.applied)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any tests to cover this?

}

// promotable indicates whether state machine can be promoted to leader,
Expand All @@ -1829,7 +1835,6 @@ impl<T: Storage> Raft<T> {
}

fn add_voter_or_learner(&mut self, id: u64, is_learner: bool) {
self.pending_conf = false;
if self.prs().voters().contains_key(&id) {
if is_learner {
info!(
Expand Down Expand Up @@ -1870,7 +1875,6 @@ impl<T: Storage> Raft<T> {

pub fn remove_node(&mut self, id: u64) {
self.mut_prs().remove(id);
self.pending_conf = false;

// do not try to commit or abort transferring if there are no nodes in the cluster.
if self.prs().voters().is_empty() && self.prs().learners().is_empty() {
Expand All @@ -1888,10 +1892,6 @@ impl<T: Storage> Raft<T> {
}
}

pub fn reset_pending_conf(&mut self) {
self.pending_conf = false;
}

pub fn set_progress(&mut self, id: u64, matched: u64, next_idx: u64, is_learner: bool) {
let mut p = new_progress(next_idx, self.max_inflight);
p.matched = matched;
Expand Down
1 change: 0 additions & 1 deletion src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl<T: Storage> RawNode<T> {

pub fn apply_conf_change(&mut self, cc: &ConfChange) -> ConfState {
if cc.get_node_id() == INVALID_ID {
self.raft.reset_pending_conf();
let mut cs = ConfState::new();
cs.set_nodes(self.raft.prs().nodes());
return cs;
Expand Down
56 changes: 18 additions & 38 deletions tests/cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2937,7 +2937,6 @@ fn test_step_config() {
m.mut_entries().push(e);
r.step(m).expect("");
assert_eq!(r.raft_log.last_index(), index + 1);
assert!(r.pending_conf);
}

// test_step_ignore_config tests that if raft step the second msgProp in
Expand All @@ -2955,62 +2954,49 @@ fn test_step_ignore_config() {
m.mut_entries().push(e);
r.step(m.clone()).expect("");
let index = r.raft_log.last_index();
let pending_conf = r.pending_conf;
let pending_conf_index = r.pending_conf_index;
r.step(m.clone()).expect("");
let mut we = empty_entry(1, 3);
we.set_entry_type(EntryType::EntryNormal);
let wents = vec![we];
let entries = r.raft_log.entries(index + 1, NO_LIMIT).expect("");
assert_eq!(entries, wents);
assert_eq!(r.pending_conf, pending_conf);
assert_eq!(r.pending_conf_index, pending_conf_index);
}

// test_recover_pending_config tests that new leader recovers its pendingConf flag
// test_new_leader_pending_config tests that new leader sets its pending_conf_index
// based on uncommitted entries.
#[test]
fn test_recover_pending_config() {
fn test_new_leader_pending_config() {
let mut tests = vec![
(EntryType::EntryNormal, false),
(EntryType::EntryConfChange, true),
(false, 0),
(true, 1),
];
for (i, (ent_type, wpending)) in tests.drain(..).enumerate() {
for (i, (add_entry, wpending_index)) in tests.drain(..).enumerate() {
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
let mut e = Entry::new();
e.set_entry_type(ent_type);
r.append_entry(&mut [e]);
if add_entry {
e.set_entry_type(EntryType::EntryNormal);
r.append_entry(&mut [e]);
}
r.become_candidate();
r.become_leader();
if r.pending_conf != wpending {
if r.pending_conf_index != wpending_index{
panic!(
"#{}: pending_conf = {}, want {}",
"#{}: pending_conf_index = {}, want {}",
i,
r.pending_conf,
wpending
r.pending_conf_index,
wpending_index
);
}
}
}

// test_recover_double_pending_config tests that new leader will panic if
// there exist two uncommitted config entries.
#[test]
fn test_recover_double_pending_config() {
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
let mut e = Entry::new();
e.set_entry_type(EntryType::EntryConfChange);
r.append_entry(&mut [e.clone()]);
r.append_entry(&mut [e]);
r.become_candidate();
assert!(panic::catch_unwind(AssertUnwindSafe(|| r.become_leader())).is_err());
}

// test_add_node tests that addNode could update pendingConf and nodes correctly.
// test_add_node tests that add_node could update nodes correctly.
#[test]
fn test_add_node() {
let mut r = new_test_raft(1, vec![1], 10, 1, new_storage());
r.pending_conf = true;
r.add_node(2);
assert!(!r.pending_conf);
assert_eq!(r.prs().nodes(), vec![1, 2]);
}

Expand All @@ -3019,9 +3005,7 @@ fn test_add_node() {
#[test]
fn test_remove_node() {
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
r.pending_conf = true;
r.remove_node(2);
assert!(!r.pending_conf);
assert_eq!(r.prs().nodes(), vec![1]);

// remove all nodes from cluster
Expand Down Expand Up @@ -3691,26 +3675,22 @@ fn test_learner_receive_snapshot() {
assert_eq!(n1_committed, n2_committed);
}

// TestAddLearner tests that addLearner could update pendingConf and nodes correctly.
// TestAddLearner tests that addLearner could update nodes correctly.
#[test]
fn test_add_learner() {
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage());
n1.pending_conf = true;
n1.add_learner(2);
assert!(!n1.pending_conf);

assert_eq!(n1.prs().nodes(), vec![1, 2]);
assert!(n1.prs().learners()[&2].is_learner);
}

// TestRemoveLearner tests that removeNode could update pendingConf, nodes and
// TestRemoveLearner tests that removeNode could update nodes and
// and removed list correctly.
#[test]
fn test_remove_learner() {
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage());
n1.pending_conf = true;
n1.remove_node(2);
assert!(!n1.pending_conf);
assert_eq!(n1.prs().nodes(), vec![1]);

n1.remove_node(1);
Expand Down
5 changes: 5 additions & 0 deletions tests/cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ fn test_skip_bcast_commit() {
// elect r1 as leader
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// Skip bcast commit when pending_conf_index <= applied
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check the result of should_bcast_commit here?

assert!(nt.peers[&1].pending_conf_index <= nt.peers[&1].raft_log.get_applied());
assert!(nt.peers[&2].pending_conf_index <= nt.peers[&2].raft_log.get_applied());
assert!(nt.peers[&3].pending_conf_index <= nt.peers[&3].raft_log.get_applied());

// Without bcast commit, followers will not update its commit index immediately.
let mut test_entries = Entry::new();
test_entries.set_data(b"testdata".to_vec());
Expand Down