Skip to content

Commit

Permalink
Refactor: add SendResult for sending result to API caller
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Feb 12, 2023
1 parent 22f01b2 commit 9ac025d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 34 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ async-trait = "0.1.36"
byte-unit = "4.0.12"
bytes = "1.0"
clap = { version = "~3.2", features = ["derive", "env"] }
derivative = "2.2.0"
derive_more = { version="0.99.9" }
futures = "0.3"
lazy_static = "1.4.0"
Expand Down
1 change: 0 additions & 1 deletion openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ repository = { workspace = true }
anyerror = { workspace = true }
async-trait = { workspace = true }
byte-unit = { workspace = true }
derivative = { workspace = true }
derive_more = { workspace = true }
futures = { workspace = true }
maplit = { workspace = true }
Expand Down
35 changes: 24 additions & 11 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::core::SnapshotState;
use crate::core::VoteWiseTime;
use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::SendResult;
use crate::entry::EntryRef;
use crate::error::AddLearnerError;
use crate::error::ChangeMembershipError;
Expand Down Expand Up @@ -672,12 +673,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(crate) async fn handle_initialize(
&mut self,
member_nodes: BTreeMap<C::NodeId, C::Node>,
) -> Result<(), InitializeError<C::NodeId, C::Node>> {
tx: RaftRespTx<(), InitializeError<C::NodeId, C::Node>>,
) -> Result<(), StorageError<C::NodeId>> {
let membership = Membership::from(member_nodes);
let payload = EntryPayload::<C>::Membership(membership);

let mut entry_refs = [EntryRef::new(&payload)];
self.engine.initialize(&mut entry_refs)?;
let res = self.engine.initialize(&mut entry_refs);
self.engine.output.push_command(Command::SendInitializeResult {
send: SendResult::new(res, tx),
});

self.run_engine_commands(&entry_refs).await?;

Ok(())
Expand Down Expand Up @@ -1116,7 +1122,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
tracing::debug!(req = display(req.summary()), func = func_name!());

let resp = self.engine.handle_vote_req(req);
self.engine.output.push_command(Command::SendVoteResult { res: Ok(resp), tx });
self.engine.output.push_command(Command::SendVoteResult {
send: SendResult::new(Ok(resp), tx),
});

self.run_engine_commands::<Entry<C>>(&[]).await?;

Expand Down Expand Up @@ -1153,7 +1161,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
tracing::debug!(req = display(req.summary()), func = func_name!());

let resp = self.engine.handle_append_entries_req(&req.vote, req.prev_log_id, &req.entries, req.leader_commit);
self.engine.output.push_command(Command::SendAppendEntriesResult { res: Ok(resp), tx });
self.engine.output.push_command(Command::SendAppendEntriesResult {
send: SendResult::new(Ok(resp), tx),
});

self.run_engine_commands(req.entries.as_slice()).await?;
Ok(())
Expand Down Expand Up @@ -1196,7 +1206,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}
RaftMsg::Initialize { members, tx } => {
let _ = tx.send(self.handle_initialize(members).await.extract_fatal()?);
self.handle_initialize(members, tx).await?;
}
RaftMsg::AddLearner { id, node, tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
Expand Down Expand Up @@ -1576,14 +1586,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
unreachable!("buffered snapshot not found: snapshot meta: {:?}", snapshot_meta)
}
}
Command::SendVoteResult { res, tx } => {
let _ = tx.send(res);
Command::SendVoteResult { send } => {
send.send();
}
Command::SendAppendEntriesResult { send } => {
send.send();
}
Command::SendAppendEntriesResult { res, tx } => {
let _ = tx.send(res);
Command::SendInstallSnapshotResult { send } => {
send.send();
}
Command::SendInstallSnapshotResult { res, tx } => {
let _ = tx.send(res);
Command::SendInitializeResult { send } => {
send.send();
}
}

Expand Down
65 changes: 44 additions & 21 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;

use tokio::sync::oneshot;

use crate::error::AppendEntriesError;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::VoteError;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft::AppendEntriesResponse;
use crate::raft::AppendEntriesTx;
use crate::raft::InstallSnapshotResponse;
use crate::raft::InstallSnapshotTx;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::VoteTx;
use crate::EffectiveMembership;
use crate::LogId;
use crate::MetricsChangeFlags;
Expand All @@ -22,9 +23,8 @@ use crate::SnapshotMeta;
use crate::Vote;

/// Commands to send to `RaftRuntime` to execute, to update the application state.
#[derive(derivative::Derivative)]
#[derivative(PartialEq)]
#[derive(Debug)]
#[derive(PartialEq, Eq)]
pub(crate) enum Command<NID, N>
where
N: Node,
Expand Down Expand Up @@ -132,25 +132,24 @@ where
// ---
/// Send vote result `res` to its caller via `tx`
SendVoteResult {
res: Result<VoteResponse<NID>, VoteError<NID>>,
#[derivative(PartialEq = "ignore")]
tx: VoteTx<NID>,
send: SendResult<Result<VoteResponse<NID>, VoteError<NID>>>,
},

/// Send append-entries result `res` to its caller via `tx`
SendAppendEntriesResult {
res: Result<AppendEntriesResponse<NID>, AppendEntriesError<NID>>,
#[derivative(PartialEq = "ignore")]
tx: AppendEntriesTx<NID>,
send: SendResult<Result<AppendEntriesResponse<NID>, AppendEntriesError<NID>>>,
},

// TODO: use it
#[allow(dead_code)]
/// Send install-snapshot result `res` to its caller via `tx`
SendInstallSnapshotResult {
res: Result<InstallSnapshotResponse<NID>, InstallSnapshotError<NID>>,
#[derivative(PartialEq = "ignore")]
tx: InstallSnapshotTx<NID>,
send: SendResult<Result<InstallSnapshotResponse<NID>, InstallSnapshotError<NID>>>,
},

/// Send install-snapshot result `res` to its caller via `tx`
SendInitializeResult {
send: SendResult<Result<(), InitializeError<NID, N>>>,
},

//
Expand All @@ -161,13 +160,6 @@ where
BuildSnapshot {},
}

impl<NID, N> Eq for Command<NID, N>
where
N: Node,
NID: NodeId,
{
}

impl<NID, N> Command<NID, N>
where
N: Node,
Expand Down Expand Up @@ -199,6 +191,37 @@ where
Command::SendVoteResult { .. } => {}
Command::SendAppendEntriesResult { .. } => {}
Command::SendInstallSnapshotResult { .. } => {}
Command::SendInitializeResult { .. } => {}
}
}
}

#[derive(Debug)]
pub(crate) struct SendResult<T>
where T: Debug + PartialEq + Eq
{
res: T,
tx: oneshot::Sender<T>,
}

impl<T> PartialEq for SendResult<T>
where T: Debug + PartialEq + Eq
{
fn eq(&self, other: &Self) -> bool {
self.res == other.res
}
}

impl<T> Eq for SendResult<T> where T: Debug + PartialEq + Eq {}

impl<T> SendResult<T>
where T: Debug + PartialEq + Eq
{
pub(crate) fn new(res: T, tx: oneshot::Sender<T>) -> Self {
Self { res, tx }
}

pub(crate) fn send(self) {
let _ = self.tx.send(self.res);
}
}
1 change: 1 addition & 0 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod log_id_list;
#[cfg(test)] mod update_progress_test;

pub(crate) use command::Command;
pub(crate) use command::SendResult;
pub(crate) use engine_impl::Engine;
pub(crate) use engine_impl::EngineConfig;
pub use log_id_list::LogIdList;

0 comments on commit 9ac025d

Please sign in to comment.