Skip to content

Commit

Permalink
Improve: move state machine operations to another task
Browse files Browse the repository at this point in the history
State machine operations, such as applying log entries, building/installing/getting snapshot are moved to `core::sm::Worker`, which is run in a standalone task other than the one running `RaftCore`.
In this way, log io operation(mostly appending log entries) and state machine io operations(mostly applying log entries) can be paralleled.

- Log io are sitll running in `RaftCore` task.

- Snapshot receiving/streaming are removed from `RaftCore`.

- Add `IOState` to `RaftState` to track the applied log id.

  This field is used to determine whether a certain command, such as
  sending a response, can be executed after a specific log has been
  applied.

- Refactor: `leader_step_down()` can only be run when the response of the second change-membership is sent.
  Before this commit, updating the `committed` is done atomically with
  sending back response. Since thie commit, these two steps are done
  separately, because applying log entries are moved to another task.
  Therefore `leader_step_down()` must wait for these two steps to be
  finished.
  • Loading branch information
drmingdrmer committed Apr 15, 2023
1 parent e5935a3 commit 6769cdd
Show file tree
Hide file tree
Showing 33 changed files with 960 additions and 270 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
// Update the state machine.
{
let updated_state_machine: StateMachine = serde_json::from_slice(&new_snapshot.data)
.map_err(|e| StorageIOError::read_snapshot(new_snapshot.meta.signature(), &e))?;
.map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?;
let mut state_machine = self.state_machine.write().await;
*state_machine = updated_state_machine;
}
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ impl Store {
self.db
.put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice())
.map_err(|e| StorageError::IO {
source: StorageIOError::write_snapshot(snap.meta.signature(), &e),
source: StorageIOError::write_snapshot(Some(snap.meta.signature()), &e),
})?;
self.flush(ErrorSubject::Snapshot(snap.meta.signature()), ErrorVerb::Write)?;
self.flush(ErrorSubject::Snapshot(Some(snap.meta.signature())), ErrorVerb::Write)?;
Ok(())
}
}
Expand Down Expand Up @@ -531,7 +531,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
// Update the state machine.
{
let updated_state_machine: SerializableExampleStateMachine = serde_json::from_slice(&new_snapshot.data)
.map_err(|e| StorageIOError::read_snapshot(new_snapshot.meta.signature(), &e))?;
.map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?;
let mut state_machine = self.state_machine.write().await;
*state_machine = StateMachine::from_serializable(updated_state_machine, self.db.clone())?;
}
Expand Down
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
// Update the state machine.
{
let new_sm: MemStoreStateMachine = serde_json::from_slice(&new_snapshot.data)
.map_err(|e| StorageIOError::read_snapshot(new_snapshot.meta.signature(), &e))?;
.map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?;
let mut sm = self.sm.write().await;
*sm = new_sm;
}
Expand Down
13 changes: 0 additions & 13 deletions openraft/src/core/building_state.rs

This file was deleted.

5 changes: 3 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
//! Also it receives and execute `Command` emitted by `Engine` to apply raft state to underlying
//! storage or forward messages to other raft nodes.
mod building_state;
mod raft_core;
mod replication_state;
mod server_state;
pub(crate) mod sm;
pub(crate) mod streaming_state;
mod tick;

pub(crate) mod snapshot_state;

pub(crate) use raft_core::ApplyResult;
pub(crate) use raft_core::ApplyingEntry;
pub use raft_core::RaftCore;
pub(crate) use replication_state::replication_lag;
pub use server_state::ServerState;
pub(crate) use snapshot_state::SnapshotResult;
pub(crate) use tick::Tick;
pub(crate) use tick::TickHandle;
Loading

0 comments on commit 6769cdd

Please sign in to comment.