diff --git a/anchor/common/qbft/src/config.rs b/anchor/common/qbft/src/config.rs index f8b8c94c..9e153a01 100644 --- a/anchor/common/qbft/src/config.rs +++ b/anchor/common/qbft/src/config.rs @@ -153,6 +153,7 @@ where leader_fn, } } + pub fn operator_id(&self) -> OperatorId { self.operator_id } diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index 69bc736c..0a4f71a2 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -100,7 +100,12 @@ where /// Once we have achieved consensus on a PREPARE round, we add the data to mapping to match /// against later. fn insert_consensus(&mut self, round: Round, hash: D::Hash) { - debug!(round = *round, ?hash, "Reached prepare consensus"); + debug!( + self = *self.config.operator_id(), + round = *round, + ?hash, + "Reached prepare consensus" + ); if let Some(past_data) = self.past_consensus.insert(round, hash.clone()) { warn!(round = *round, ?hash, past_data = ?past_data, "Adding duplicate consensus data"); } @@ -162,12 +167,21 @@ where // Handles the beginning of a round. fn start_round(&mut self) { - debug!(round = *self.current_round, "Starting new round"); + debug!( + self = *self.config.operator_id(), + round = *self.current_round, + "Starting new round" + ); // Remove round change messages that would be for previous rounds self.round_change_messages .retain(|&round, _value| round >= self.current_round); + // We are waiting for consensus on a round change, do not start the round yet + if matches!(self.state, InstanceState::SentRoundChange) { + return; + } + // Initialise the instance state for the round self.state = InstanceState::AwaitingProposal; @@ -253,7 +267,11 @@ where return; }; - debug!(from = *operator_id, "PROPOSE received"); + debug!( + self = *self.config.operator_id(), + from = *operator_id, + "PROPOSE received" + ); let hash = consensus_data.data.data.hash(); // Justify the proposal by checking the round changes @@ -315,7 +333,11 @@ where return; }; - debug!(from = *operator_id, "PREPARE received"); + debug!( + self = *self.config.operator_id(), + from = *operator_id, + "PREPARE received" + ); // Store the prepare message if !self @@ -349,8 +371,8 @@ where // Send the data if let Some(data) = update_data { - self.send_commit(data.clone()); - self.insert_consensus(self.current_round, data); + self.insert_consensus(self.current_round, data.clone()); + self.send_commit(data); } } @@ -367,7 +389,7 @@ where // Check that we are awaiting a proposal if (self.state as u8) >= (InstanceState::SentRoundChange as u8) { - warn!(from=*operator_id, ?self.state, "COMMIT message while in invalid state"); + warn!(self = *self.config.operator_id(), from = *operator_id, ?self.state, "COMMIT message while in invalid state"); return; } @@ -392,7 +414,11 @@ where return; }; - debug!(from = *operator_id, "COMMIT received"); + debug!( + self = *self.config.operator_id(), + from = *operator_id, + "COMMIT received" + ); // Store the received commit message if !self @@ -416,7 +442,13 @@ where if operators.len() >= self.config.quorum_size() && matches!(self.state, InstanceState::Commit) { + debug!( + self = *self.config.operator_id(), + round = *self.current_round, + "Reached commit consensus" + ); self.completed = Some(Completed::Success(data.clone())); + debug!(self = *self.config.operator_id(), round = *self.current_round, data = ?self.completed, "Consensus reached"); self.state = InstanceState::Complete; } } @@ -475,7 +507,12 @@ where None => None, }; */ - debug!(from = *operator_id, "ROUNDCHANGE received"); + debug!( + self = *self.config.operator_id(), + from = *operator_id, + current_round = *self.current_round, + "ROUNDCHANGE received" + ); // Store the round change message, for the round the message references if self @@ -499,17 +536,26 @@ where && matches!(self.state, InstanceState::SentRoundChange) { // 1. If we have reached a quorum for this round, advance to that round. - debug!( - operator_id = ?self.config.operator_id(), - round = *round, - "Round change quorum reached" - ); + debug!(operator_id = ?self.config.operator_id(), round = *round, "Round change quorum reached"); + + // We have reached consensus on a round change, we can start a new round now + self.state = InstanceState::RoundChangeConsensus; + self.set_round(round); } else if new_round_messages.len() > self.config.get_f() && !(matches!(self.state, InstanceState::SentRoundChange)) { // 2. We have seen 2f + 1 messtages for this round. - self.send_round_change(round); + + // Only send a round change messages if we have not already previously sent one for + // this round + if let Some(round_msgs) = self.round_change_messages.get(&round) { + if !round_msgs.contains_key(&operator_id) { + self.send_round_change(round); + } + } else { + self.send_round_change(round); + } } } } @@ -544,7 +590,7 @@ where fn send_prepare(&mut self, data: D::Hash) { self.state = InstanceState::Prepare; - debug!(?self.state, "State Changed"); + debug!(self = *self.config.operator_id(), ?self.state, "State changed"); let consensus_data = ConsensusData { round: self.current_round, data, @@ -556,7 +602,7 @@ where fn send_commit(&mut self, data: D::Hash) { self.state = InstanceState::Commit; - debug!(?self.state, "State changed"); + debug!(self = *self.config.operator_id(), ?self.state, "State changed"); let consensus_data = ConsensusData { round: self.current_round, data, @@ -568,7 +614,7 @@ where fn send_round_change(&mut self, round: Round) { self.state = InstanceState::SentRoundChange; - debug!(state = ?self.state, "New State"); + debug!(self = *self.config.operator_id(), ?self.state, "State changed"); // Get the maximum round we have come to consensus on let best_consensus = self @@ -580,6 +626,12 @@ where data: data.clone(), }); + debug!( + from = *self.config.operator_id(), + current_round = *self.current_round, + "Sending ROUNDCHANGE" + ); + let operator_id = self.config.operator_id(); (self.send_message)(Message::RoundChange( operator_id, diff --git a/anchor/common/qbft/src/tests.rs b/anchor/common/qbft/src/tests.rs index 2d0361d5..7d4924a3 100644 --- a/anchor/common/qbft/src/tests.rs +++ b/anchor/common/qbft/src/tests.rs @@ -5,7 +5,7 @@ use super::*; use crate::validation::{validate_data, ValidatedData}; use std::cell::RefCell; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::rc::Rc; use tracing_subscriber::filter::EnvFilter; use types::DefaultLeaderFunction; @@ -61,6 +61,9 @@ impl TestQBFTCommitteeBuilder { struct TestQBFTCommittee)> { msg_queue: Rc)>>>, instances: HashMap>, + // All of the instances that are currently active, allows us to stop/restart instances by + // controlling the messages being send and received + active_instances: HashSet, } /// Constructs and runs committee of QBFT Instances @@ -75,6 +78,7 @@ fn construct_and_run_committee( let msg_queue = Rc::new(RefCell::new(VecDeque::new())); let mut instances = HashMap::with_capacity(config.committee_members().len()); + let mut active_instances = HashSet::new(); for id in 0..config.committee_members().len() { let msg_queue = Rc::clone(&msg_queue); @@ -87,31 +91,52 @@ fn construct_and_run_committee( move |message| msg_queue.borrow_mut().push_back((id, message)), ); instances.insert(id, instance); + active_instances.insert(id); } TestQBFTCommittee { msg_queue, instances, + active_instances, } } impl)> TestQBFTCommittee { - fn wait_until_end(mut self) { + fn wait_until_end(mut self) -> i32 { loop { let msg = self.msg_queue.borrow_mut().pop_front(); let Some((sender, msg)) = msg else { - // we are done! - return; + // we are done! check how many instances reached consensus + let mut num_consensus = 0; + for id in self.active_instances.iter() { + let instance = self.instances.get_mut(id).expect("Instance exists"); + // Check if this instance just reached consensus + if matches!(instance.completed(), Some(Completed::Success(_))) { + num_consensus += 1; + } + } + return num_consensus; }; - for instance in self - .instances - .iter_mut() - .filter_map(|(id, instance)| (id != &sender).then_some(instance)) - { - instance.receive(msg.clone()); + + // Only recieve messages for active instances + for id in self.active_instances.iter() { + if *id != sender { + let instance = self.instances.get_mut(id).expect("Instance exists"); + instance.receive(msg.clone()); + } } } } + + // Pause an qbft instance from running. This will simulate the node going down + pub fn pause_instance(&mut self, id: &OperatorId) { + self.active_instances.remove(id); + } + + /// Restart a paused qbft instance. This will simulate it coming back online + pub fn restart_instance(&mut self, id: &OperatorId) { + self.active_instances.insert(*id); + } } #[derive(Debug, Copy, Clone, Default)] @@ -126,11 +151,150 @@ impl Data for TestData { } #[test] +// Construct and run a test committee fn test_basic_committee() { - // Construct and run a test committee - let test_instance = TestQBFTCommitteeBuilder::default().run(TestData(21)); // Wait until consensus is reached or all the instances have ended - test_instance.wait_until_end(); + let num_consensus = test_instance.wait_until_end(); + assert!(num_consensus == 5); +} + +#[test] +// Test consensus recovery with F faulty operators +fn test_consensus_with_f_faulty_operators() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(21)); + + test_instance.pause_instance(&OperatorId::from(2)); + + // Wait until consensus is reached or all the instances have ended + let num_consensus = test_instance.wait_until_end(); + assert!(num_consensus == 4); +} + +#[test] +fn test_node_recovery() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(42)); + + // Pause a node + test_instance.pause_instance(&OperatorId::from(0)); + + // Then restart it + test_instance.restart_instance(&OperatorId::from(0)); + + let num_consensus = test_instance.wait_until_end(); + assert_eq!(num_consensus, 5); // Should reach full consensus after recovery +} + +#[test] +fn test_duplicate_proposals() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(42)); + + // Send duplicate propose messages + let msg = Message::Propose( + OperatorId::from(0), + ConsensusData { + round: Round::default(), + data: TestData(42), + }, + ); + + // Send the same message multiple times + for id in 0..5 { + let instance = test_instance + .instances + .get_mut(&OperatorId::from(id)) + .unwrap(); + instance.receive(msg.clone()); + instance.receive(msg.clone()); + instance.receive(msg.clone()); + } + + let num_consensus = test_instance.wait_until_end(); + assert_eq!(num_consensus, 5); // Should still reach consensus despite duplicates +} + +#[test] +fn test_invalid_sender() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(42)); + + // Create a message from an invalid sender (operator id 10 which isn't in the committee) + let invalid_msg = Message::Propose( + OperatorId::from(10), + ConsensusData { + round: Round::default(), + data: TestData(42), + }, + ); + + // Send to a valid instance + let instance = test_instance + .instances + .get_mut(&OperatorId::from(0)) + .unwrap(); + instance.receive(invalid_msg); + + let num_consensus = test_instance.wait_until_end(); + assert_eq!(num_consensus, 5); // Should ignore invalid sender and still reach consensus +} + +#[test] +fn test_proposal_from_non_leader() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(42)); + + // Send proposal from non-leader (node 1) + let non_leader_msg = Message::Propose( + OperatorId::from(1), + ConsensusData { + round: Round::default(), + data: TestData(42), + }, + ); + + // Send to all instances + for instance in test_instance.instances.values_mut() { + instance.receive(non_leader_msg.clone()); + } + + let num_consensus = test_instance.wait_until_end(); + assert_eq!(num_consensus, 5); // Should ignore non-leader proposal and still reach consensus +} + +#[test] +fn test_invalid_round_messages() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(42)); + + // Create a message with an invalid round number + let future_round = Round::default().next().unwrap().next().unwrap(); // Round 3 + let invalid_round_msg = Message::Prepare( + OperatorId::from(0), + ConsensusData { + round: future_round, + data: 42, + }, + ); + + // Send to all instances + for instance in test_instance.instances.values_mut() { + instance.receive(invalid_round_msg.clone()); + } + + let num_consensus = test_instance.wait_until_end(); + assert_eq!(num_consensus, 5); // Should ignore invalid round messages and still reach consensus +} + +#[test] +fn test_round_change_timeout() { + let mut test_instance = TestQBFTCommitteeBuilder::default().run(TestData(42)); + + // Pause the leader node to force a round change + test_instance.pause_instance(&OperatorId::from(0)); + + // Manually trigger round changes in all instances + for instance in test_instance.instances.values_mut() { + instance.end_round(); + } + + let num_consensus = test_instance.wait_until_end(); + assert_eq!(num_consensus, 4); // Should reach consensus with new leader } diff --git a/anchor/common/qbft/src/types.rs b/anchor/common/qbft/src/types.rs index 87d329ee..16b1de0a 100644 --- a/anchor/common/qbft/src/types.rs +++ b/anchor/common/qbft/src/types.rs @@ -85,6 +85,8 @@ pub enum InstanceState { SentRoundChange = 4, /// The consensus instance is complete Complete, + /// We have reached consensus on a round change + RoundChangeConsensus, } /// Generic Data trait to allow for future implementations of the QBFT module diff --git a/book/book.toml b/book/book.toml index 65b5a658..457ea5bf 100644 --- a/book/book.toml +++ b/book/book.toml @@ -6,7 +6,7 @@ title = "Anchor Book" author = "Sigma Prime" [output.html] -additional-css =["src/css/custom.css"] +additional-css = ["src/css/custom.css"] default-theme = "coal" additional-js = ["mermaid.min.js", "mermaid-init.js"]