From e3a535e36d16348b8d5b845be6d5cd613d3d7858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 11 Feb 2023 15:52:39 +0800 Subject: [PATCH 1/2] Refactor: vote checking and handling is same for every incoming event. --- openraft/src/core/install_snapshot.rs | 2 - openraft/src/engine/engine_impl.rs | 48 +++++++++++++-------- openraft/src/engine/handler/vote_handler.rs | 4 ++ 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/openraft/src/core/install_snapshot.rs b/openraft/src/core/install_snapshot.rs index 3ba7ccbeb..d8ce2e267 100644 --- a/openraft/src/core/install_snapshot.rs +++ b/openraft/src/core/install_snapshot.rs @@ -48,8 +48,6 @@ impl, S: RaftStorage> RaftCore) -> VoteResponse { - tracing::debug!(req = display(req.summary()), "Engine::handle_vote_req"); - tracing::debug!( + tracing::info!(req = display(req.summary()), "Engine::handle_vote_req"); + tracing::info!( my_vote = display(self.state.get_vote().summary()), my_last_log_id = display(self.state.last_log_id().summary()), "Engine::handle_vote_req" ); - // TODO: refactor - let res = if req.last_log_id.as_ref() >= self.state.last_log_id() { - self.vote_handler().handle_message_vote(&req.vote) - } else { - Err(RejectVoteRequest::ByLastLogId(self.state.last_log_id().copied())) - }; + // The first step is to check log. If the candidate has less log, nothing needs to be done. - let vote_granted = if let Err(reject) = res { - tracing::debug!( - req = display(req.summary()), - err = display(reject), - "reject vote request" - ); - false + if req.last_log_id.as_ref() >= self.state.last_log_id() { + // Ok } else { - true - }; + // The res is not used yet. + // let _res = Err(RejectVoteRequest::ByLastLogId(self.state.last_log_id().copied())); + return VoteResponse { + // Return the updated vote, this way the candidate knows which vote is granted, in case + // the candidate's vote is changed after sending the vote request. + vote: *self.state.get_vote(), + vote_granted: false, + last_log_id: self.state.last_log_id().copied(), + }; + } + + // Then check vote just as it does for every incoming event. + + let res = self.vote_handler().handle_message_vote(&req.vote); + + tracing::info!( + req = display(req.summary()), + result = debug(&res), + "handle vote request result" + ); + + let vote_granted = res.is_ok(); VoteResponse { // Return the updated vote, this way the candidate knows which vote is granted, in case @@ -449,7 +459,7 @@ where return rejected.into(); } - // Vote is legal. Check if prev_log_id matches local raft-log. + // Vote is legal. let mut fh = self.following_handler(); fh.append_entries(prev_log_id, entries, leader_committed) diff --git a/openraft/src/engine/handler/vote_handler.rs b/openraft/src/engine/handler/vote_handler.rs index f23dcbbe6..6810d1957 100644 --- a/openraft/src/engine/handler/vote_handler.rs +++ b/openraft/src/engine/handler/vote_handler.rs @@ -42,6 +42,10 @@ where /// - Leadership won't be lost if a leader restarted quick enough. pub(crate) fn commit_vote(&mut self) { debug_assert!(!self.state.get_vote().committed); + debug_assert_eq!( + self.state.get_vote().node_id == self.config.id, + "it can only commit its own vote" + ); let mut v = *self.state.get_vote(); v.commit(); From 88754e0917db8ca58f6faa93445d7396cddce177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 11 Feb 2023 22:35:18 +0800 Subject: [PATCH 2/2] Improve: make sending rpc-response a command and enqueue it in the command list Sending rpc(vote, append-entries and install-snapshot) response must be after executing the commands generated by Engine. Thus introduce `Command` for doing such actions, e.g., `tx.send(res)`. Command execution can be reordered or batched for performance. Because `oneshot::Sender` is not `Clone`, `Command` and other related types that includes it are adjusted. --- Cargo.toml | 1 + openraft/Cargo.toml | 1 + openraft/src/core/install_snapshot.rs | 44 ++++++++--- openraft/src/core/raft_core.rs | 78 ++++++++++++------- openraft/src/engine/command.rs | 49 +++++++++++- openraft/src/engine/engine_impl.rs | 4 +- .../src/engine/handler/replication_handler.rs | 3 +- openraft/src/engine/handler/vote_handler.rs | 3 +- openraft/src/error.rs | 3 + openraft/src/lib.rs | 14 ++++ openraft/src/raft.rs | 3 +- openraft/src/runtime/mod.rs | 2 +- 12 files changed, 157 insertions(+), 48 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ca82fb30..9695be2a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ async-trait = "0.1.36" byte-unit = "4.0.12" bytes = "1.0" clap = { version = "~3.2", features = ["derive", "env"] } +derivative = "2.2.0" derive_more = { version="0.99.9" } futures = "0.3" lazy_static = "1.4.0" diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 217d055f8..b50ba5680 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -17,6 +17,7 @@ repository = { workspace = true } anyerror = { workspace = true } async-trait = { workspace = true } byte-unit = { workspace = true } +derivative = { workspace = true } derive_more = { workspace = true } futures = { workspace = true } maplit = { workspace = true } diff --git a/openraft/src/core/install_snapshot.rs b/openraft/src/core/install_snapshot.rs index d8ce2e267..69bd64225 100644 --- a/openraft/src/core/install_snapshot.rs +++ b/openraft/src/core/install_snapshot.rs @@ -4,10 +4,10 @@ use tokio::io::AsyncWriteExt; use crate::core::streaming_state::StreamingState; use crate::core::RaftCore; use crate::core::SnapshotState; -use crate::error::InstallSnapshotError; use crate::error::SnapshotMismatch; use crate::raft::InstallSnapshotRequest; use crate::raft::InstallSnapshotResponse; +use crate::raft::InstallSnapshotTx; use crate::raft_state::VoteStateReader; use crate::Entry; use crate::ErrorSubject; @@ -32,7 +32,8 @@ impl, S: RaftStorage> RaftCore, - ) -> Result, InstallSnapshotError> { + tx: InstallSnapshotTx, + ) -> Result<(), StorageError> { tracing::debug!(req = display(req.summary())); let res = self.engine.vote_handler().handle_message_vote(&req.vote); @@ -43,9 +44,10 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, - ) -> Result<(), InstallSnapshotError> { + fn check_new_install_snapshot(&mut self, req: &InstallSnapshotRequest) -> Result<(), SnapshotMismatch> { tracing::debug!(req = display(req.summary())); let id = req.meta.snapshot_id.clone(); @@ -118,10 +127,21 @@ impl, S: RaftStorage> RaftCore, + ) -> Result<(), StorageError> { + tracing::debug!(req = display(req.summary())); + + let id = req.meta.snapshot_id.clone(); + let snapshot_data = self.storage.begin_receiving_snapshot().await?; self.snapshot_state = SnapshotState::Streaming(StreamingState::new(id, snapshot_data)); diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 51885ae6c..0dc8db094 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -54,7 +54,6 @@ use crate::error::NetworkError; use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; -use crate::error::VoteError; use crate::metrics::RaftMetrics; use crate::metrics::ReplicationMetrics; use crate::metrics::UpdateMatchedLogId; @@ -65,6 +64,7 @@ use crate::quorum::QuorumSet; use crate::raft::AddLearnerResponse; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; +use crate::raft::AppendEntriesTx; use crate::raft::ClientWriteResponse; use crate::raft::ClientWriteTx; use crate::raft::ExternalCommand; @@ -73,6 +73,7 @@ use crate::raft::RaftMsg; use crate::raft::RaftRespTx; use crate::raft::VoteRequest; use crate::raft::VoteResponse; +use crate::raft::VoteTx; use crate::raft_state::LogStateReader; use crate::raft_state::VoteStateReader; use crate::raft_types::LogIdOptionExt; @@ -1006,7 +1007,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, - ) -> Result, VoteError> { - tracing::debug!(req = display(req.summary()), "handle_vote_request"); + tx: VoteTx, + ) -> Result<(), StorageError> { + tracing::debug!(req = display(req.summary()), func = func_name!()); let resp = self.engine.handle_vote_req(req); + self.engine.output.push_command(Command::SendVoteResult { res: Ok(resp), tx }); + self.run_engine_commands::>(&[]).await?; - Ok(resp) + Ok(()) } /// Handle response from a vote request sent to a peer. @@ -1140,19 +1144,31 @@ impl, S: RaftStorage> RaftCore, + tx: AppendEntriesTx, + ) -> Result<(), StorageError> { + tracing::debug!(req = display(req.summary()), func = func_name!()); + + let resp = self.engine.handle_append_entries_req(&req.vote, req.prev_log_id, &req.entries, req.leader_commit); + self.engine.output.push_command(Command::SendAppendEntriesResult { res: Ok(resp), tx }); + + self.run_engine_commands(req.entries.as_slice()).await?; + Ok(()) + } + #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))] pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg) -> Result<(), Fatal> { tracing::debug!("recv from rx_api: {}", msg.summary()); match msg { RaftMsg::AppendEntries { rpc, tx } => { - let resp = - self.engine.handle_append_entries_req(&rpc.vote, rpc.prev_log_id, &rpc.entries, rpc.leader_commit); - self.run_engine_commands(rpc.entries.as_slice()).await?; - let _ = tx.send(Ok(resp)); + self.handle_append_entries_request(rpc, tx).await?; } RaftMsg::RequestVote { rpc, tx } => { - let _ = tx.send(self.handle_vote_request(rpc).await.extract_fatal()?); + self.handle_vote_request(rpc, tx).await?; } RaftMsg::VoteResponse { target, resp, vote } => { if self.does_vote_match(&vote, "VoteResponse") { @@ -1160,7 +1176,7 @@ impl, S: RaftStorage> RaftCore { - let _ = tx.send(self.handle_install_snapshot_request(rpc).await.extract_fatal()?); + self.handle_install_snapshot_request(rpc, tx).await?; } RaftMsg::BuildingSnapshotResult { result } => { self.handle_building_snapshot_result(result).await?; @@ -1405,7 +1421,7 @@ impl, S: RaftStorage> RaftRuntime &mut self, input_ref_entries: &'e [Ent], cur: &mut usize, - cmd: &Command, + cmd: Command, ) -> Result<(), StorageError> where Ent: RaftLogId + Sync + Send + 'e, @@ -1444,7 +1460,7 @@ impl, S: RaftStorage> RaftRuntime } Command::AppendBlankLog { log_id } => { let ent = Entry { - log_id: *log_id, + log_id, payload: EntryPayload::Blank, }; let entry_refs = vec![&ent]; @@ -1452,24 +1468,24 @@ impl, S: RaftStorage> RaftRuntime } Command::MoveInputCursorBy { n } => *cur += n, Command::SaveVote { vote } => { - self.storage.save_vote(vote).await?; + self.storage.save_vote(&vote).await?; } Command::InstallElectionTimer { can_be_leader } => { - self.set_next_election_time(*can_be_leader); + self.set_next_election_time(can_be_leader); } - Command::PurgeLog { upto } => self.storage.purge_logs_upto(*upto).await?, + Command::PurgeLog { upto } => self.storage.purge_logs_upto(upto).await?, Command::DeleteConflictLog { since } => { - self.storage.delete_conflict_logs_since(*since).await?; + self.storage.delete_conflict_logs_since(since).await?; } // TODO(2): Engine initiate a snapshot building Command::BuildSnapshot { .. } => {} Command::SendVote { vote_req } => { - self.spawn_parallel_vote_requests(vote_req).await; + self.spawn_parallel_vote_requests(&vote_req).await; } Command::ReplicateCommitted { committed } => { if let Some(l) = &self.leader_data { for node in l.nodes.values() { - let _ = node.tx_repl.send(Replicate::Committed(*committed)); + let _ = node.tx_repl.send(Replicate::Committed(committed)); } } else { unreachable!("it has to be a leader!!!"); @@ -1492,7 +1508,7 @@ impl, S: RaftStorage> RaftRuntime // TODO(2): consider remove the returned error from new_client(). // Node may not exist because `RaftNetworkFactory::new_client()` returns an // error. - let node = &l.nodes.get(target); + let node = &l.nodes.get(&target); if let Some(node) = node { match req { @@ -1500,18 +1516,15 @@ impl, S: RaftStorage> RaftRuntime unreachable!("Inflight::None"); } Inflight::Logs { id, log_id_range } => { - let _ = node.tx_repl.send(Replicate::Logs { - id: *id, - log_id_range: *log_id_range, - }); + let _ = node.tx_repl.send(Replicate::Logs { id, log_id_range }); } Inflight::Snapshot { id, last_log_id } => { let snapshot = self.storage.get_current_snapshot().await?; tracing::debug!("snapshot: {}", snapshot.as_ref().map(|x| &x.meta).summary()); if let Some(snapshot) = snapshot { - debug_assert_eq!(last_log_id, &snapshot.meta.last_log_id); - let _ = node.tx_repl.send(Replicate::Snapshot { id: *id, snapshot }); + debug_assert_eq!(last_log_id, snapshot.meta.last_log_id); + let _ = node.tx_repl.send(Replicate::Snapshot { id, snapshot }); } else { unreachable!("No snapshot"); } @@ -1544,7 +1557,7 @@ impl, S: RaftStorage> RaftRuntime } } Command::UpdateProgressMetrics { target, matching } => { - self.update_progress_metrics(*target, *matching); + self.update_progress_metrics(target, matching); } Command::UpdateMembership { .. } => { // TODO: not used @@ -1557,12 +1570,21 @@ impl, S: RaftStorage> RaftRuntime let snapshot_data = self.received_snapshot.remove(&snapshot_meta.snapshot_id); if let Some(data) = snapshot_data { - self.storage.install_snapshot(snapshot_meta, data).await?; + self.storage.install_snapshot(&snapshot_meta, data).await?; tracing::debug!("Done install_snapshot, meta: {:?}", snapshot_meta); } else { unreachable!("buffered snapshot not found: snapshot meta: {:?}", snapshot_meta) } } + Command::SendVoteResult { res, tx } => { + let _ = tx.send(res); + } + Command::SendAppendEntriesResult { res, tx } => { + let _ = tx.send(res); + } + Command::SendInstallSnapshotResult { res, tx } => { + let _ = tx.send(res); + } } Ok(()) diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 781cbfaa6..2621384ea 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -1,9 +1,18 @@ use std::ops::Range; use std::sync::Arc; +use crate::error::AppendEntriesError; +use crate::error::InstallSnapshotError; +use crate::error::VoteError; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; +use crate::raft::AppendEntriesResponse; +use crate::raft::AppendEntriesTx; +use crate::raft::InstallSnapshotResponse; +use crate::raft::InstallSnapshotTx; use crate::raft::VoteRequest; +use crate::raft::VoteResponse; +use crate::raft::VoteTx; use crate::EffectiveMembership; use crate::LogId; use crate::MetricsChangeFlags; @@ -13,7 +22,9 @@ use crate::SnapshotMeta; use crate::Vote; /// Commands to send to `RaftRuntime` to execute, to update the application state. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(derivative::Derivative)] +#[derivative(PartialEq)] +#[derive(Debug)] pub(crate) enum Command where N: Node, @@ -116,6 +127,32 @@ where /// A received snapshot does not need to be installed, just drop buffered snapshot data. CancelSnapshot { snapshot_meta: SnapshotMeta }, + // --- + // --- Response commands + // --- + /// Send vote result `res` to its caller via `tx` + SendVoteResult { + res: Result, VoteError>, + #[derivative(PartialEq = "ignore")] + tx: VoteTx, + }, + + /// Send append-entries result `res` to its caller via `tx` + SendAppendEntriesResult { + res: Result, AppendEntriesError>, + #[derivative(PartialEq = "ignore")] + tx: AppendEntriesTx, + }, + + // TODO: use it + #[allow(dead_code)] + /// Send install-snapshot result `res` to its caller via `tx` + SendInstallSnapshotResult { + res: Result, InstallSnapshotError>, + #[derivative(PartialEq = "ignore")] + tx: InstallSnapshotTx, + }, + // // --- Draft unimplemented commands: @@ -124,6 +161,13 @@ where BuildSnapshot {}, } +impl Eq for Command +where + N: Node, + NID: NodeId, +{ +} + impl Command where N: Node, @@ -152,6 +196,9 @@ where Command::InstallSnapshot { .. } => flags.set_data_changed(), Command::CancelSnapshot { .. } => {} Command::BuildSnapshot { .. } => flags.set_data_changed(), + Command::SendVoteResult { .. } => {} + Command::SendAppendEntriesResult { .. } => {} + Command::SendInstallSnapshotResult { .. } => {} } } } diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 251a949d4..05d2c5af7 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -60,7 +60,7 @@ impl Default for EngineConfig { } /// The entry of output from Engine to the runtime. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Default)] #[derive(PartialEq, Eq)] pub(crate) struct EngineOutput where @@ -94,7 +94,7 @@ where /// This structure only contains necessary information to run raft algorithm, /// but none of the application specific data. /// TODO: make the fields private -#[derive(Debug, Clone, Default)] +#[derive(Debug, Default)] #[derive(PartialEq, Eq)] pub(crate) struct Engine where diff --git a/openraft/src/engine/handler/replication_handler.rs b/openraft/src/engine/handler/replication_handler.rs index 87a0251af..fabfb6175 100644 --- a/openraft/src/engine/handler/replication_handler.rs +++ b/openraft/src/engine/handler/replication_handler.rs @@ -434,11 +434,10 @@ mod tests { #[test] fn test_update_matching_no_leader() -> anyhow::Result<()> { - let mut eng = eng(); - // There is no leader, it should panic. let res = std::panic::catch_unwind(move || { + let mut eng = eng(); eng.replication_handler().update_matching(3, 0, Some(log_id(1, 2))); }); tracing::info!("res: {:?}", res); diff --git a/openraft/src/engine/handler/vote_handler.rs b/openraft/src/engine/handler/vote_handler.rs index 6810d1957..cd4d48fe7 100644 --- a/openraft/src/engine/handler/vote_handler.rs +++ b/openraft/src/engine/handler/vote_handler.rs @@ -43,7 +43,8 @@ where pub(crate) fn commit_vote(&mut self) { debug_assert!(!self.state.get_vote().committed); debug_assert_eq!( - self.state.get_vote().node_id == self.config.id, + self.state.get_vote().node_id, + self.config.id, "it can only commit its own vote" ); diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 31ff7de29..9e43aacb3 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -63,6 +63,7 @@ where // TODO: not used, remove #[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub enum AppendEntriesError where NID: NodeId @@ -73,6 +74,7 @@ where NID: NodeId // TODO: not used, remove #[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub enum VoteError where NID: NodeId @@ -83,6 +85,7 @@ where NID: NodeId // TODO: remove #[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub enum InstallSnapshotError where NID: NodeId diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 88ad0a8ae..e3f6dd213 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -17,6 +17,20 @@ //! - `serde`: Add serde::Serialize and serde:Deserialize bound to data types. If you'd like to use //! `serde` to serialize messages. +macro_rules! func_name { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + std::any::type_name::() + } + let name = type_name_of(f); + let n = &name[..name.len() - 3]; + let nn = n.replace("::{{closure}}", ""); + nn + // nn.split("::").last().unwrap_or_default() + }}; +} + mod change_members; mod config; mod core; diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index a96c2f931..7b609ca49 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -857,7 +857,7 @@ pub struct AddLearnerResponse { pub matched: Option>, } -/// TX for Add Learner Respose +/// TX for Add Learner Response pub(crate) type RaftAddLearnerTx = RaftRespTx, AddLearnerError>; /// TX for Install Snapshot Response @@ -1239,6 +1239,7 @@ impl MessageSummary> for InstallSna /// The response to an `InstallSnapshotRequest`. #[derive(Debug)] +#[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct InstallSnapshotResponse { pub vote: Vote, diff --git a/openraft/src/runtime/mod.rs b/openraft/src/runtime/mod.rs index 555eb80d9..f43254481 100644 --- a/openraft/src/runtime/mod.rs +++ b/openraft/src/runtime/mod.rs @@ -77,7 +77,7 @@ pub(crate) trait RaftRuntime { &mut self, input_entries: &'e [Ent], curr: &mut usize, - cmd: &Command, + cmd: Command, ) -> Result<(), StorageError> where Ent: RaftLogId + Sync + Send + 'e,