From 9ac025db8dd53a8ef217792374e6e8bff2a67510 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 12 Feb 2023 09:19:18 +0800 Subject: [PATCH] Refactor: add SendResult for sending result to API caller --- Cargo.toml | 1 - openraft/Cargo.toml | 1 - openraft/src/core/raft_core.rs | 35 ++++++++++++------ openraft/src/engine/command.rs | 65 +++++++++++++++++++++++----------- openraft/src/engine/mod.rs | 1 + 5 files changed, 69 insertions(+), 34 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9695be2a1..5ca82fb30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ async-trait = "0.1.36" byte-unit = "4.0.12" bytes = "1.0" clap = { version = "~3.2", features = ["derive", "env"] } -derivative = "2.2.0" derive_more = { version="0.99.9" } futures = "0.3" lazy_static = "1.4.0" diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index b50ba5680..217d055f8 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -17,7 +17,6 @@ repository = { workspace = true } anyerror = { workspace = true } async-trait = { workspace = true } byte-unit = { workspace = true } -derivative = { workspace = true } derive_more = { workspace = true } futures = { workspace = true } maplit = { workspace = true } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 0dc8db094..c44a6d12c 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -37,6 +37,7 @@ use crate::core::SnapshotState; use crate::core::VoteWiseTime; use crate::engine::Command; use crate::engine::Engine; +use crate::engine::SendResult; use crate::entry::EntryRef; use crate::error::AddLearnerError; use crate::error::ChangeMembershipError; @@ -672,12 +673,17 @@ impl, S: RaftStorage> RaftCore, - ) -> Result<(), InitializeError> { + tx: RaftRespTx<(), InitializeError>, + ) -> Result<(), StorageError> { let membership = Membership::from(member_nodes); let payload = EntryPayload::::Membership(membership); let mut entry_refs = [EntryRef::new(&payload)]; - self.engine.initialize(&mut entry_refs)?; + let res = self.engine.initialize(&mut entry_refs); + self.engine.output.push_command(Command::SendInitializeResult { + send: SendResult::new(res, tx), + }); + self.run_engine_commands(&entry_refs).await?; Ok(()) @@ -1116,7 +1122,9 @@ impl, S: RaftStorage> RaftCore>(&[]).await?; @@ -1153,7 +1161,9 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore { - let _ = tx.send(self.handle_initialize(members).await.extract_fatal()?); + self.handle_initialize(members, tx).await?; } RaftMsg::AddLearner { id, node, tx } => { if self.engine.state.is_leader(&self.engine.config.id) { @@ -1576,14 +1586,17 @@ impl, S: RaftStorage> RaftRuntime unreachable!("buffered snapshot not found: snapshot meta: {:?}", snapshot_meta) } } - Command::SendVoteResult { res, tx } => { - let _ = tx.send(res); + Command::SendVoteResult { send } => { + send.send(); + } + Command::SendAppendEntriesResult { send } => { + send.send(); } - Command::SendAppendEntriesResult { res, tx } => { - let _ = tx.send(res); + Command::SendInstallSnapshotResult { send } => { + send.send(); } - Command::SendInstallSnapshotResult { res, tx } => { - let _ = tx.send(res); + Command::SendInitializeResult { send } => { + send.send(); } } diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 2621384ea..2688a3dbd 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -1,18 +1,19 @@ +use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; +use tokio::sync::oneshot; + use crate::error::AppendEntriesError; +use crate::error::InitializeError; use crate::error::InstallSnapshotError; use crate::error::VoteError; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::raft::AppendEntriesResponse; -use crate::raft::AppendEntriesTx; use crate::raft::InstallSnapshotResponse; -use crate::raft::InstallSnapshotTx; use crate::raft::VoteRequest; use crate::raft::VoteResponse; -use crate::raft::VoteTx; use crate::EffectiveMembership; use crate::LogId; use crate::MetricsChangeFlags; @@ -22,9 +23,8 @@ use crate::SnapshotMeta; use crate::Vote; /// Commands to send to `RaftRuntime` to execute, to update the application state. -#[derive(derivative::Derivative)] -#[derivative(PartialEq)] #[derive(Debug)] +#[derive(PartialEq, Eq)] pub(crate) enum Command where N: Node, @@ -132,25 +132,24 @@ where // --- /// Send vote result `res` to its caller via `tx` SendVoteResult { - res: Result, VoteError>, - #[derivative(PartialEq = "ignore")] - tx: VoteTx, + send: SendResult, VoteError>>, }, /// Send append-entries result `res` to its caller via `tx` SendAppendEntriesResult { - res: Result, AppendEntriesError>, - #[derivative(PartialEq = "ignore")] - tx: AppendEntriesTx, + send: SendResult, AppendEntriesError>>, }, // TODO: use it #[allow(dead_code)] /// Send install-snapshot result `res` to its caller via `tx` SendInstallSnapshotResult { - res: Result, InstallSnapshotError>, - #[derivative(PartialEq = "ignore")] - tx: InstallSnapshotTx, + send: SendResult, InstallSnapshotError>>, + }, + + /// Send install-snapshot result `res` to its caller via `tx` + SendInitializeResult { + send: SendResult>>, }, // @@ -161,13 +160,6 @@ where BuildSnapshot {}, } -impl Eq for Command -where - N: Node, - NID: NodeId, -{ -} - impl Command where N: Node, @@ -199,6 +191,37 @@ where Command::SendVoteResult { .. } => {} Command::SendAppendEntriesResult { .. } => {} Command::SendInstallSnapshotResult { .. } => {} + Command::SendInitializeResult { .. } => {} } } } + +#[derive(Debug)] +pub(crate) struct SendResult +where T: Debug + PartialEq + Eq +{ + res: T, + tx: oneshot::Sender, +} + +impl PartialEq for SendResult +where T: Debug + PartialEq + Eq +{ + fn eq(&self, other: &Self) -> bool { + self.res == other.res + } +} + +impl Eq for SendResult where T: Debug + PartialEq + Eq {} + +impl SendResult +where T: Debug + PartialEq + Eq +{ + pub(crate) fn new(res: T, tx: oneshot::Sender) -> Self { + Self { res, tx } + } + + pub(crate) fn send(self) { + let _ = self.tx.send(self.res); + } +} diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index bde9d2205..fabe2920c 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -44,6 +44,7 @@ mod log_id_list; #[cfg(test)] mod update_progress_test; pub(crate) use command::Command; +pub(crate) use command::SendResult; pub(crate) use engine_impl::Engine; pub(crate) use engine_impl::EngineConfig; pub use log_id_list::LogIdList;