Skip to content

Commit

Permalink
feature: Membership provides method is_majority() and simplify quorum…
Browse files Browse the repository at this point in the history
… calculation for voting
  • Loading branch information
drmingdrmer committed Dec 25, 2021
1 parent 6c0ccaf commit 1451f96
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 54 deletions.
28 changes: 8 additions & 20 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ mod vote;
mod startup_test;

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;

use futures::future::AbortHandle;
use futures::future::Abortable;
use maplit::btreeset;
use rand::thread_rng;
use rand::Rng;
use serde::Deserialize;
Expand Down Expand Up @@ -832,24 +834,18 @@ pub fn is_matched_upto_date(matched: &LogId, last_log_id: &LogId, config: &Confi
/// Volatile state specific to a Raft node in candidate state.
struct CandidateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
core: &'a mut RaftCore<D, R, N, S>,
/// The number of votes which have been granted by peer nodes of the old (current) config group.
votes_granted_old: u64,
/// The number of votes needed from the old (current) config group in order to become the Raft leader.
votes_needed_old: u64,
/// The number of votes which have been granted by peer nodes of the new config group (if applicable).
votes_granted_new: u64,
/// The number of votes needed from the new config group in order to become the Raft leader (if applicable).
votes_needed_new: u64,

/// Ids of the nodes that has granted our vote request.
granted: BTreeSet<NodeId>,
}

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> CandidateState<'a, D, R, N, S> {
pub(self) fn new(core: &'a mut RaftCore<D, R, N, S>) -> Self {
let id = core.id;
Self {
core,
votes_granted_old: 0,
votes_needed_old: 0,
votes_granted_new: 0,
votes_needed_new: 0,
// vote for itself.
granted: btreeset! {id},
}
}

Expand All @@ -862,14 +858,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return Ok(());
}

// Setup initial state per term.
self.votes_granted_old = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_old = ((self.core.membership.membership.get_ith_config(0).unwrap().len() / 2) + 1) as u64; // Just need a majority.
if let Some(nodes) = self.core.membership.membership.get_ith_config(1) {
self.votes_granted_new = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_new = ((nodes.len() / 2) + 1) as u64; // Just need a majority.
}

// Setup new term.
self.core.update_next_election_timeout(false); // Generates a new rand value within range.
self.core.current_term += 1;
Expand Down
14 changes: 3 additions & 11 deletions async-raft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return Ok(());
}

// If peer granted vote, then update campaign state.
if res.vote_granted {
// Handle vote responses from the C0 config group.
if self.core.membership.membership.is_in_ith_config(0, &target) {
self.votes_granted_old += 1;
}
// Handle vote responses from members of C1 config group.
if self.core.membership.membership.is_in_ith_config(1, &target) {
self.votes_granted_new += 1;
}
// If we've received enough votes from both config groups, then transition to leader state`.
if self.votes_granted_old >= self.votes_needed_old && self.votes_granted_new >= self.votes_needed_new {
self.granted.insert(target);

if self.core.membership.membership.is_majority(&self.granted) {
tracing::debug!("transitioning to leader state as minimum number of votes have been received");
self.core.set_target_state(State::Leader);
return Ok(());
Expand Down
40 changes: 25 additions & 15 deletions async-raft/src/membership_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@ fn test_membership() -> anyhow::Result<()> {
assert_eq!(None, m123.get_ith_config(1).cloned());
assert_eq!(Some(btreeset! {3,4,5}), m123_345.get_ith_config(1).cloned());

assert!(m1.is_in_ith_config(0, &1));
assert!(!m1.is_in_ith_config(0, &2));
assert!(!m1.is_in_ith_config(1, &1));
assert!(!m1.is_in_ith_config(1, &2));

assert!(m123.is_in_ith_config(0, &1));
assert!(m123.is_in_ith_config(0, &2));
assert!(!m123.is_in_ith_config(1, &1));
assert!(!m123.is_in_ith_config(1, &2));

assert!(m123_345.is_in_ith_config(0, &1));
assert!(m123_345.is_in_ith_config(0, &2));
assert!(!m123_345.is_in_ith_config(1, &1));
assert!(m123_345.is_in_ith_config(1, &4));

assert_eq!(vec![1], m1.ith_config(0));
assert_eq!(vec![1, 2, 3], m123.ith_config(0));
assert_eq!(vec![1, 2, 3], m123_345.ith_config(0));
Expand Down Expand Up @@ -85,3 +70,28 @@ fn test_membership_update() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn test_membership_majority() -> anyhow::Result<()> {
{
let m12345 = MembershipConfig::new_single(btreeset! {1,2,3,4,5});
assert!(!m12345.is_majority(&btreeset! {0}));
assert!(!m12345.is_majority(&btreeset! {0,1,2}));
assert!(!m12345.is_majority(&btreeset! {6,7,8}));
assert!(m12345.is_majority(&btreeset! {1,2,3}));
assert!(m12345.is_majority(&btreeset! {3,4,5}));
assert!(m12345.is_majority(&btreeset! {1,3,4,5}));
}

{
let m12345_123 = MembershipConfig::new_multi(vec![btreeset! {1,2,3,4,5}, btreeset! {6,7,8}]);
assert!(!m12345_123.is_majority(&btreeset! {0}));
assert!(!m12345_123.is_majority(&btreeset! {0,1,2}));
assert!(!m12345_123.is_majority(&btreeset! {6,7,8}));
assert!(!m12345_123.is_majority(&btreeset! {1,2,3}));
assert!(m12345_123.is_majority(&btreeset! {1,2,3,6,7}));
assert!(m12345_123.is_majority(&btreeset! {1,2,3,4,7,8}));
}

Ok(())
}
30 changes: 22 additions & 8 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::error::RaftError;
use crate::error::RaftResult;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::quorum;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
Expand Down Expand Up @@ -657,14 +658,6 @@ impl MembershipConfig {
self.configs.get(i)
}

pub fn is_in_ith_config(&self, i: usize, id: &u64) -> bool {
if let Some(c) = self.configs.get(i) {
c.contains(id)
} else {
false
}
}

pub fn ith_config(&self, i: usize) -> Vec<NodeId> {
self.configs[i].iter().cloned().collect()
}
Expand Down Expand Up @@ -699,6 +692,27 @@ impl MembershipConfig {
MembershipConfig::new_single(last)
}

/// Return true if the given set of ids constitutes a majority.
///
/// I.e. the id set includes a majority of every config.
pub fn is_majority(&self, granted: &BTreeSet<NodeId>) -> bool {
for config in self.configs.iter() {
if !Self::is_majority_of_single_config(granted, config) {
return false;
}
}

true
}

fn is_majority_of_single_config(granted: &BTreeSet<NodeId>, single_config: &BTreeSet<NodeId>) -> bool {
let d = granted.intersection(single_config);
let n_granted = d.fold(0, |a, _x| a + 1);

let majority = quorum::majority_of(single_config.len());
n_granted >= majority
}

fn build_all_nodes(configs: &[BTreeSet<NodeId>]) -> BTreeSet<NodeId> {
let mut nodes = BTreeSet::new();
for config in configs.iter() {
Expand Down

0 comments on commit 1451f96

Please sign in to comment.