Skip to content

Commit

Permalink
Merge pull request #667 from drmingdrmer/80-following
Browse files Browse the repository at this point in the history
Improve: make sending rpc-response a command and enqueue it in the command list
  • Loading branch information
drmingdrmer authored Feb 11, 2023
2 parents 7d3639b + 88754e0 commit 22f01b2
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = "0.1.36"
byte-unit = "4.0.12"
bytes = "1.0"
clap = { version = "~3.2", features = ["derive", "env"] }
derivative = "2.2.0"
derive_more = { version="0.99.9" }
futures = "0.3"
lazy_static = "1.4.0"
Expand Down
1 change: 1 addition & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ repository = { workspace = true }
anyerror = { workspace = true }
async-trait = { workspace = true }
byte-unit = { workspace = true }
derivative = { workspace = true }
derive_more = { workspace = true }
futures = { workspace = true }
maplit = { workspace = true }
Expand Down
46 changes: 32 additions & 14 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use tokio::io::AsyncWriteExt;
use crate::core::streaming_state::StreamingState;
use crate::core::RaftCore;
use crate::core::SnapshotState;
use crate::error::InstallSnapshotError;
use crate::error::SnapshotMismatch;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::InstallSnapshotTx;
use crate::raft_state::VoteStateReader;
use crate::Entry;
use crate::ErrorSubject;
Expand All @@ -32,7 +32,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(super) async fn handle_install_snapshot_request(
&mut self,
req: InstallSnapshotRequest<C>,
) -> Result<InstallSnapshotResponse<C::NodeId>, InstallSnapshotError<C::NodeId>> {
tx: InstallSnapshotTx<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(req = display(req.summary()));

let res = self.engine.vote_handler().handle_message_vote(&req.vote);
Expand All @@ -43,13 +44,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
req_vote = display(&req.vote),
"InstallSnapshot RPC term is less than current term, ignoring it."
);
return Ok(InstallSnapshotResponse {
let _ = tx.send(Ok(InstallSnapshotResponse {
vote: *self.engine.state.get_vote(),
});
}));
return Ok(());
}

self.set_next_election_time(false);

// Clear the state to None if it is building a snapshot locally.
if let SnapshotState::Snapshotting {
abort_handle,
Expand All @@ -68,6 +68,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

// Init a new streaming state if it is None.
if let SnapshotState::None = self.snapshot_state {
if let Err(e) = self.check_new_install_snapshot(&req) {
let _ = tx.send(Err(e.into()));
return Ok(());
}
self.begin_installing_snapshot(&req).await?;
}

Expand All @@ -84,6 +88,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
};

if stream_changed {
if let Err(e) = self.check_new_install_snapshot(&req) {
let _ = tx.send(Err(e.into()));
return Ok(());
}
self.begin_installing_snapshot(&req).await?;
}

Expand All @@ -99,16 +107,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.finalize_snapshot_installation(req_meta).await?;
}

Ok(InstallSnapshotResponse {
let _ = tx.send(Ok(InstallSnapshotResponse {
vote: *self.engine.state.get_vote(),
})
}));

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn begin_installing_snapshot(
&mut self,
req: &InstallSnapshotRequest<C>,
) -> Result<(), InstallSnapshotError<C::NodeId>> {
fn check_new_install_snapshot(&mut self, req: &InstallSnapshotRequest<C>) -> Result<(), SnapshotMismatch> {
tracing::debug!(req = display(req.summary()));

let id = req.meta.snapshot_id.clone();
Expand All @@ -120,10 +127,21 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
offset: 0,
},
got: SnapshotSegmentId { id, offset: req.offset },
}
.into());
});
}

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn begin_installing_snapshot(
&mut self,
req: &InstallSnapshotRequest<C>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(req = display(req.summary()));

let id = req.meta.snapshot_id.clone();

let snapshot_data = self.storage.begin_receiving_snapshot().await?;
self.snapshot_state = SnapshotState::Streaming(StreamingState::new(id, snapshot_data));

Expand Down
78 changes: 50 additions & 28 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::error::NetworkError;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
use crate::error::VoteError;
use crate::metrics::RaftMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::UpdateMatchedLogId;
Expand All @@ -65,6 +64,7 @@ use crate::quorum::QuorumSet;
use crate::raft::AddLearnerResponse;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::AppendEntriesTx;
use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteTx;
use crate::raft::ExternalCommand;
Expand All @@ -73,6 +73,7 @@ use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::VoteTx;
use crate::raft_state::LogStateReader;
use crate::raft_state::VoteStateReader;
use crate::raft_types::LogIdOptionExt;
Expand Down Expand Up @@ -1006,7 +1007,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
swap(&mut self.engine.output.commands, &mut commands);
for cmd in commands {
tracing::debug!("run command: {:?}", cmd);
self.run_command(input_entries, &mut curr, &cmd).await?;
self.run_command(input_entries, &mut curr, cmd).await?;
}

Ok(())
Expand Down Expand Up @@ -1110,13 +1111,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(super) async fn handle_vote_request(
&mut self,
req: VoteRequest<C::NodeId>,
) -> Result<VoteResponse<C::NodeId>, VoteError<C::NodeId>> {
tracing::debug!(req = display(req.summary()), "handle_vote_request");
tx: VoteTx<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(req = display(req.summary()), func = func_name!());

let resp = self.engine.handle_vote_req(req);
self.engine.output.push_command(Command::SendVoteResult { res: Ok(resp), tx });

self.run_engine_commands::<Entry<C>>(&[]).await?;

Ok(resp)
Ok(())
}

/// Handle response from a vote request sent to a peer.
Expand All @@ -1140,27 +1144,39 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
pub(super) async fn handle_append_entries_request(
&mut self,
req: AppendEntriesRequest<C>,
tx: AppendEntriesTx<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(req = display(req.summary()), func = func_name!());

let resp = self.engine.handle_append_entries_req(&req.vote, req.prev_log_id, &req.entries, req.leader_commit);
self.engine.output.push_command(Command::SendAppendEntriesResult { res: Ok(resp), tx });

self.run_engine_commands(req.entries.as_slice()).await?;
Ok(())
}

#[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))]
pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C, N, S>) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!("recv from rx_api: {}", msg.summary());

match msg {
RaftMsg::AppendEntries { rpc, tx } => {
let resp =
self.engine.handle_append_entries_req(&rpc.vote, rpc.prev_log_id, &rpc.entries, rpc.leader_commit);
self.run_engine_commands(rpc.entries.as_slice()).await?;
let _ = tx.send(Ok(resp));
self.handle_append_entries_request(rpc, tx).await?;
}
RaftMsg::RequestVote { rpc, tx } => {
let _ = tx.send(self.handle_vote_request(rpc).await.extract_fatal()?);
self.handle_vote_request(rpc, tx).await?;
}
RaftMsg::VoteResponse { target, resp, vote } => {
if self.does_vote_match(&vote, "VoteResponse") {
self.handle_vote_resp(resp, target).await?;
}
}
RaftMsg::InstallSnapshot { rpc, tx } => {
let _ = tx.send(self.handle_install_snapshot_request(rpc).await.extract_fatal()?);
self.handle_install_snapshot_request(rpc, tx).await?;
}
RaftMsg::BuildingSnapshotResult { result } => {
self.handle_building_snapshot_result(result).await?;
Expand Down Expand Up @@ -1405,7 +1421,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
&mut self,
input_ref_entries: &'e [Ent],
cur: &mut usize,
cmd: &Command<C::NodeId, C::Node>,
cmd: Command<C::NodeId, C::Node>,
) -> Result<(), StorageError<C::NodeId>>
where
Ent: RaftLogId<C::NodeId> + Sync + Send + 'e,
Expand Down Expand Up @@ -1444,32 +1460,32 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
}
Command::AppendBlankLog { log_id } => {
let ent = Entry {
log_id: *log_id,
log_id,
payload: EntryPayload::Blank,
};
let entry_refs = vec![&ent];
self.storage.append_to_log(&entry_refs).await?
}
Command::MoveInputCursorBy { n } => *cur += n,
Command::SaveVote { vote } => {
self.storage.save_vote(vote).await?;
self.storage.save_vote(&vote).await?;
}
Command::InstallElectionTimer { can_be_leader } => {
self.set_next_election_time(*can_be_leader);
self.set_next_election_time(can_be_leader);
}
Command::PurgeLog { upto } => self.storage.purge_logs_upto(*upto).await?,
Command::PurgeLog { upto } => self.storage.purge_logs_upto(upto).await?,
Command::DeleteConflictLog { since } => {
self.storage.delete_conflict_logs_since(*since).await?;
self.storage.delete_conflict_logs_since(since).await?;
}
// TODO(2): Engine initiate a snapshot building
Command::BuildSnapshot { .. } => {}
Command::SendVote { vote_req } => {
self.spawn_parallel_vote_requests(vote_req).await;
self.spawn_parallel_vote_requests(&vote_req).await;
}
Command::ReplicateCommitted { committed } => {
if let Some(l) = &self.leader_data {
for node in l.nodes.values() {
let _ = node.tx_repl.send(Replicate::Committed(*committed));
let _ = node.tx_repl.send(Replicate::Committed(committed));
}
} else {
unreachable!("it has to be a leader!!!");
Expand All @@ -1492,26 +1508,23 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
// TODO(2): consider remove the returned error from new_client().
// Node may not exist because `RaftNetworkFactory::new_client()` returns an
// error.
let node = &l.nodes.get(target);
let node = &l.nodes.get(&target);

if let Some(node) = node {
match req {
Inflight::None => {
unreachable!("Inflight::None");
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::Logs {
id: *id,
log_id_range: *log_id_range,
});
let _ = node.tx_repl.send(Replicate::Logs { id, log_id_range });
}
Inflight::Snapshot { id, last_log_id } => {
let snapshot = self.storage.get_current_snapshot().await?;
tracing::debug!("snapshot: {}", snapshot.as_ref().map(|x| &x.meta).summary());

if let Some(snapshot) = snapshot {
debug_assert_eq!(last_log_id, &snapshot.meta.last_log_id);
let _ = node.tx_repl.send(Replicate::Snapshot { id: *id, snapshot });
debug_assert_eq!(last_log_id, snapshot.meta.last_log_id);
let _ = node.tx_repl.send(Replicate::Snapshot { id, snapshot });
} else {
unreachable!("No snapshot");
}
Expand Down Expand Up @@ -1544,7 +1557,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
}
}
Command::UpdateProgressMetrics { target, matching } => {
self.update_progress_metrics(*target, *matching);
self.update_progress_metrics(target, matching);
}
Command::UpdateMembership { .. } => {
// TODO: not used
Expand All @@ -1557,12 +1570,21 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
let snapshot_data = self.received_snapshot.remove(&snapshot_meta.snapshot_id);

if let Some(data) = snapshot_data {
self.storage.install_snapshot(snapshot_meta, data).await?;
self.storage.install_snapshot(&snapshot_meta, data).await?;
tracing::debug!("Done install_snapshot, meta: {:?}", snapshot_meta);
} else {
unreachable!("buffered snapshot not found: snapshot meta: {:?}", snapshot_meta)
}
}
Command::SendVoteResult { res, tx } => {
let _ = tx.send(res);
}
Command::SendAppendEntriesResult { res, tx } => {
let _ = tx.send(res);
}
Command::SendInstallSnapshotResult { res, tx } => {
let _ = tx.send(res);
}
}

Ok(())
Expand Down
Loading

0 comments on commit 22f01b2

Please sign in to comment.