From 6098f5cf61b074c8fccecf722278183c3ee5cd27 Mon Sep 17 00:00:00 2001 From: wvwwvwwv Date: Wed, 21 Jun 2023 13:25:14 +0900 Subject: [PATCH] Change: add AsyncRuntime type parameter to RaftTypeConfig (#869) - Add AsyncRuntime type parameter to RaftTypeConfig This commit introduces the AsyncRuntime type parameter to RaftTypeConfig, allowing applications to choose their preferred asynchronous runtime, such as tokio or async-std. - Parameterize Instant type for flexibility with async runtimes To create a more flexible interface between the crate and asynchronous runtimes, the Instant type is now associated with the async runtime. This is because Instant::now can have different implementations. This commit adds the trait Instant and TokioInstant for representing system states. - Fix: #741 --- cluster_benchmark/tests/benchmark/store.rs | 3 +- examples/raft-kv-memstore/src/lib.rs | 3 +- examples/raft-kv-rocksdb/src/lib.rs | 3 +- memstore/src/lib.rs | 3 +- openraft/src/async_runtime.rs | 112 ++++++++++++++++++ openraft/src/compat/compat07.rs | 4 +- openraft/src/core/raft_core.rs | 44 ++++--- openraft/src/core/sm/mod.rs | 9 +- openraft/src/core/tick.rs | 30 +++-- .../docs/getting_started/getting-started.md | 17 +-- openraft/src/engine/engine_impl.rs | 20 ++-- .../following_handler/append_entries_test.rs | 4 +- .../install_snapshot_test.rs | 6 +- .../engine/handler/following_handler/mod.rs | 7 +- .../receive_snapshot_chunk_test.rs | 4 +- .../leader_handler/append_entries_test.rs | 4 +- .../src/engine/handler/leader_handler/mod.rs | 6 +- .../leader_handler/send_heartbeat_test.rs | 4 +- .../src/engine/handler/log_handler/mod.rs | 3 +- .../append_membership_test.rs | 8 +- .../engine/handler/replication_handler/mod.rs | 18 +-- .../update_matching_test.rs | 4 +- .../handler/server_state_handler/mod.rs | 3 +- .../update_server_state_test.rs | 6 +- .../engine/handler/snapshot_handler/mod.rs | 3 +- .../handler/vote_handler/accept_vote_test.rs | 4 +- .../vote_handler/handle_message_vote_test.rs | 8 +- .../src/engine/handler/vote_handler/mod.rs | 24 ++-- openraft/src/engine/testing.rs | 2 + .../src/engine/tests/append_entries_test.rs | 8 +- openraft/src/engine/tests/elect_test.rs | 4 +- .../src/engine/tests/handle_vote_req_test.rs | 6 +- .../src/engine/tests/handle_vote_resp_test.rs | 24 ++-- openraft/src/engine/tests/initialize_test.rs | 4 +- openraft/src/engine/tests/startup_test.rs | 6 +- openraft/src/instant.rs | 42 +++++++ openraft/src/internal_server_state.rs | 27 +++-- openraft/src/leader/leader.rs | 24 ++-- openraft/src/leader/voting.rs | 21 ++-- openraft/src/lib.rs | 6 + openraft/src/metrics/wait.rs | 16 ++- openraft/src/metrics/wait_test.rs | 8 +- openraft/src/raft.rs | 39 ++++-- openraft/src/raft_state/mod.rs | 46 +++++-- .../tests/forward_to_leader_test.rs | 8 +- .../raft_state/tests/log_state_reader_test.rs | 25 ++-- .../src/raft_state/tests/validate_test.rs | 3 +- openraft/src/replication/mod.rs | 46 +++---- openraft/src/replication/response.rs | 3 +- openraft/src/storage/helper.rs | 11 +- openraft/src/testing/suite.rs | 3 +- openraft/src/utime.rs | 41 ++++--- rocksstore-compat07/src/lib.rs | 3 +- rocksstore/src/lib.rs | 3 +- sledstore/src/lib.rs | 3 +- stores/rocksstore-v2/src/lib.rs | 3 +- .../append_entries/t60_enable_heartbeat.rs | 8 +- .../t61_heartbeat_reject_vote.rs | 8 +- tests/tests/fixtures/mod.rs | 6 +- 59 files changed, 556 insertions(+), 265 deletions(-) create mode 100644 openraft/src/async_runtime.rs create mode 100644 openraft/src/instant.rs diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index b3a3e79ae..141a713ac 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -25,6 +25,7 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; +use openraft::TokioRuntime; use openraft::Vote; use serde::Deserialize; use serde::Serialize; @@ -40,7 +41,7 @@ pub type NodeId = u64; openraft::declare_raft_types!( pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (), - Entry = Entry, SnapshotData = Cursor> + Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); #[derive(Debug)] diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index d681983ad..328a52ad6 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -11,6 +11,7 @@ use actix_web::HttpServer; use openraft::storage::Adaptor; use openraft::BasicNode; use openraft::Config; +use openraft::TokioRuntime; use crate::app::App; use crate::network::api; @@ -31,7 +32,7 @@ pub type NodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, - Entry = openraft::Entry, SnapshotData = Cursor> + Entry = openraft::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); pub type LogStore = Adaptor>; diff --git a/examples/raft-kv-rocksdb/src/lib.rs b/examples/raft-kv-rocksdb/src/lib.rs index 0cf12d80b..afbc82672 100644 --- a/examples/raft-kv-rocksdb/src/lib.rs +++ b/examples/raft-kv-rocksdb/src/lib.rs @@ -10,6 +10,7 @@ use async_std::net::TcpListener; use async_std::task; use openraft::storage::Adaptor; use openraft::Config; +use openraft::TokioRuntime; use crate::app::App; use crate::network::api; @@ -41,7 +42,7 @@ impl Display for Node { openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, - Entry = openraft::Entry, SnapshotData = Cursor> + Entry = openraft::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); pub type LogStore = Adaptor>; diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 9affd4cb9..113195c3d 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -26,6 +26,7 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; +use openraft::TokioRuntime; use openraft::Vote; use serde::Deserialize; use serde::Serialize; @@ -74,7 +75,7 @@ pub type MemNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), - Entry = Entry, SnapshotData = Cursor> + Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); /// The application snapshot type which the `MemStore` works with. diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs new file mode 100644 index 000000000..46ecbaeee --- /dev/null +++ b/openraft/src/async_runtime.rs @@ -0,0 +1,112 @@ +use std::fmt::Debug; +use std::fmt::Display; +use std::future::Future; +use std::time::Duration; + +use crate::Instant; +use crate::TokioInstant; + +/// A trait defining interfaces with an asynchronous runtime. +/// +/// The intention of this trait is to allow an application using this crate to bind an asynchronous +/// runtime that suits it the best. +/// +/// ## Note +/// +/// The default asynchronous runtime is `tokio`. +pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static { + /// The error type of [`Self::JoinHandle`]. + type JoinError: Debug + Display + Send; + + /// The return type of [`Self::spawn`]. + type JoinHandle: Future> + Send + Sync + Unpin; + + /// The type that enables the user to sleep in an asynchronous runtime. + type Sleep: Future + Send + Sync; + + /// A measurement of a monotonically non-decreasing clock. + type Instant: Instant; + + /// The timeout error type. + type TimeoutError: Debug + Display + Send; + + /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user + /// to await the outcome of a [`Future`]. + type Timeout + Send>: Future> + Send; + + /// Spawn a new task. + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static; + + /// Wait until `duration` has elapsed. + fn sleep(duration: Duration) -> Self::Sleep; + + /// Wait until `deadline` is reached. + fn sleep_until(deadline: Self::Instant) -> Self::Sleep; + + /// Require a [`Future`] to complete before the specified duration has elapsed. + fn timeout + Send>(duration: Duration, future: F) -> Self::Timeout; + + /// Require a [`Future`] to complete before the specified instant in time. + fn timeout_at + Send>(deadline: Self::Instant, future: F) -> Self::Timeout; + + /// Check if the [`Self::JoinError`] is `panic`. + fn is_panic(join_error: &Self::JoinError) -> bool; + + /// Abort the task associated with the supplied join handle. + fn abort(join_handle: &Self::JoinHandle); +} + +/// `Tokio` is the default asynchronous executor. +#[derive(Debug, Default)] +pub struct TokioRuntime; + +impl AsyncRuntime for TokioRuntime { + type JoinError = tokio::task::JoinError; + type JoinHandle = tokio::task::JoinHandle; + type Sleep = tokio::time::Sleep; + type Instant = TokioInstant; + type TimeoutError = tokio::time::error::Elapsed; + type Timeout + Send> = tokio::time::Timeout; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + tokio::task::spawn(future) + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + tokio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + tokio::time::sleep_until(deadline) + } + + #[inline] + fn timeout + Send>(duration: Duration, future: F) -> Self::Timeout { + tokio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + Send>(deadline: Self::Instant, future: F) -> Self::Timeout { + tokio::time::timeout_at(deadline, future) + } + + #[inline] + fn is_panic(join_error: &Self::JoinError) -> bool { + join_error.is_panic() + } + + #[inline] + fn abort(join_handle: &Self::JoinHandle) { + join_handle.abort(); + } +} diff --git a/openraft/src/compat/compat07.rs b/openraft/src/compat/compat07.rs index 73e1d475e..fc19e4fee 100644 --- a/openraft/src/compat/compat07.rs +++ b/openraft/src/compat/compat07.rs @@ -421,6 +421,7 @@ mod tests { use crate::compat::Upgrade; use crate::CommittedLeaderId; + use crate::TokioRuntime; #[test] fn test_serde_log_id() -> anyhow::Result<()> { @@ -511,7 +512,8 @@ mod tests { crate::declare_raft_types!( pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, - Entry = crate::Entry, SnapshotData = Cursor> + Entry = crate::Entry, SnapshotData = Cursor>, + AsyncRuntime = TokioRuntime ); #[test] diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 02d5a39d3..02cbd9e5b 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -6,6 +6,7 @@ use std::fmt::Formatter; use std::marker::PhantomData; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use anyerror::AnyError; use futures::stream::FuturesUnordered; @@ -16,9 +17,6 @@ use tokio::select; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; -use tokio::time::timeout; -use tokio::time::Duration; -use tokio::time::Instant; use tracing::Instrument; use tracing::Level; use tracing::Span; @@ -84,7 +82,9 @@ use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::utime::UTime; +use crate::AsyncRuntime; use crate::ChangeMembers; +use crate::Instant; use crate::LogId; use crate::Membership; use crate::MessageSummary; @@ -140,7 +140,7 @@ pub(crate) struct LeaderData { pub(super) replications: BTreeMap>, /// The time to send next heartbeat. - pub(crate) next_heartbeat: Instant, + pub(crate) next_heartbeat: ::Instant, } impl LeaderData { @@ -148,7 +148,7 @@ impl LeaderData { Self { client_resp_channels: Default::default(), replications: BTreeMap::new(), - next_heartbeat: Instant::now(), + next_heartbeat: ::Instant::now(), } } } @@ -308,7 +308,7 @@ where let option = RPCOption::new(ttl); let fu = async move { - let outer_res = timeout(ttl, client.append_entries(rpc, option)).await; + let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await; match outer_res { Ok(append_res) => match append_res { Ok(x) => Ok((target, x)), @@ -328,7 +328,7 @@ where }; let fu = fu.instrument(tracing::debug_span!("spawn_is_leader", target = target.to_string())); - let task = tokio::spawn(fu).map_err(move |err| (target, err)); + let task = C::AsyncRuntime::spawn(fu).map_err(move |err| (target, err)); pending.push(task); } @@ -392,7 +392,7 @@ where .into())); }; - tokio::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting"))); + C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting"))); } /// Submit change-membership by writing a Membership log entry. @@ -470,14 +470,21 @@ where /// Currently heartbeat is a blank log #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool { - tracing::debug!(now = debug(Instant::now()), "send_heartbeat"); + tracing::debug!( + now = debug(::Instant::now()), + "send_heartbeat" + ); let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject::<(), ClientWriteError>(None) { lh } else { - tracing::debug!(now = debug(Instant::now()), "{} failed to send heartbeat", emitter); + tracing::debug!( + now = debug(::Instant::now()), + "{} failed to send heartbeat", + emitter + ); return false; }; @@ -988,9 +995,9 @@ where let id = self.id; let option = RPCOption::new(ttl); - tokio::spawn( + C::AsyncRuntime::spawn( async move { - let tm_res = timeout(ttl, client.vote(req, option)).await; + let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await; let res = match tm_res { Ok(res) => res, @@ -1062,7 +1069,7 @@ where self.handle_append_entries_request(rpc, tx); } RaftMsg::RequestVote { rpc, tx } => { - let now = Instant::now(); + let now = ::Instant::now(); tracing::info!( now = debug(now), vote_request = display(rpc.summary()), @@ -1149,7 +1156,7 @@ where resp, sender_vote: vote, } => { - let now = Instant::now(); + let now = ::Instant::now(); tracing::info!( now = debug(now), @@ -1185,7 +1192,7 @@ where Notify::Tick { i } => { // check every timer - let now = Instant::now(); + let now = ::Instant::now(); tracing::debug!("received tick: {}, now: {:?}", i, now); self.handle_tick_election(); @@ -1202,7 +1209,8 @@ where // Install next heartbeat if let Some(l) = &mut self.leader_data { - l.next_heartbeat = Instant::now() + Duration::from_millis(self.config.heartbeat_interval); + l.next_heartbeat = ::Instant::now() + + Duration::from_millis(self.config.heartbeat_interval); } } } @@ -1330,7 +1338,7 @@ where #[tracing::instrument(level = "debug", skip_all)] fn handle_tick_election(&mut self) { - let now = Instant::now(); + let now = ::Instant::now(); tracing::debug!("try to trigger election by tick, now: {:?}", now); @@ -1399,7 +1407,7 @@ where &mut self, target: C::NodeId, id: u64, - result: Result>, String>, + result: Result, ::Instant>, String>, ) { tracing::debug!( target = display(target), diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index e1db564f2..e39b652a3 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -16,6 +16,7 @@ use crate::entry::RaftPayload; use crate::raft::InstallSnapshotRequest; use crate::storage::RaftStateMachine; use crate::summary::MessageSummary; +use crate::AsyncRuntime; use crate::RaftLogId; use crate::RaftSnapshotBuilder; use crate::RaftTypeConfig; @@ -41,7 +42,7 @@ where C: RaftTypeConfig { cmd_tx: mpsc::UnboundedSender>, #[allow(dead_code)] - join_handle: tokio::task::JoinHandle<()>, + join_handle: ::JoinHandle<()>, } impl Handle @@ -91,8 +92,8 @@ where Handle { cmd_tx, join_handle } } - fn do_spawn(mut self) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { + fn do_spawn(mut self) -> ::JoinHandle<()> { + C::AsyncRuntime::spawn(async move { let res = self.worker_loop().await; if let Err(err) = res { @@ -221,7 +222,7 @@ where let mut builder = self.state_machine.get_snapshot_builder().await; - let _handle = tokio::spawn(async move { + let _handle = C::AsyncRuntime::spawn(async move { let res = builder.build_snapshot().await; let res = res.map(|snap| Response::BuildSnapshot(snap.meta)); let cmd_res = CommandResult::new(seq, res); diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index ef0bbcc8f..7e388b8eb 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -6,14 +6,13 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use tokio::task::JoinHandle; -use tokio::time::sleep_until; -use tokio::time::Instant; use tracing::Instrument; use tracing::Level; use tracing::Span; use crate::core::notify::Notify; +use crate::AsyncRuntime; +use crate::Instant; use crate::RaftTypeConfig; /// Emit RaftMsg::Tick event at regular `interval`. @@ -28,23 +27,28 @@ where C: RaftTypeConfig enabled: Arc, } -pub(crate) struct TickHandle { +pub(crate) struct TickHandle +where C: RaftTypeConfig +{ enabled: Arc, - join_handle: JoinHandle<()>, + join_handle: ::JoinHandle<()>, } impl Tick where C: RaftTypeConfig { - pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender>, enabled: bool) -> TickHandle { + pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender>, enabled: bool) -> TickHandle { let enabled = Arc::new(AtomicBool::from(enabled)); let this = Self { interval, enabled: enabled.clone(), tx, }; - let join_handle = - tokio::spawn(this.tick_loop().instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick"))); + let join_handle = C::AsyncRuntime::spawn(this.tick_loop().instrument(tracing::span!( + parent: &Span::current(), + Level::DEBUG, + "tick" + ))); TickHandle { enabled, join_handle } } @@ -53,8 +57,8 @@ where C: RaftTypeConfig loop { i += 1; - let at = Instant::now() + self.interval; - sleep_until(at).await; + let at = ::Instant::now() + self.interval; + C::AsyncRuntime::sleep_until(at).await; if !self.enabled.load(Ordering::Relaxed) { i -= 1; @@ -71,12 +75,14 @@ where C: RaftTypeConfig } } -impl TickHandle { +impl TickHandle +where C: RaftTypeConfig +{ pub(crate) fn enable(&self, enabled: bool) { self.enabled.store(enabled, Ordering::Relaxed); } pub(crate) async fn shutdown(&self) { - self.join_handle.abort(); + C::AsyncRuntime::abort(&self.join_handle); } } diff --git a/openraft/src/docs/getting_started/getting-started.md b/openraft/src/docs/getting_started/getting-started.md index 418f338e3..2df1aeef6 100644 --- a/openraft/src/docs/getting_started/getting-started.md +++ b/openraft/src/docs/getting_started/getting-started.md @@ -60,6 +60,7 @@ impl openraft::RaftTypeConfig for TypeConfig { type Node = BasicNode; type Entry = openraft::Entry; type SnapshotData = Cursor>; + type AsyncRuntime = TokioRuntime; } ``` @@ -81,7 +82,7 @@ It could be a wrapper for a local key-value store like [RocksDB](https://docs.rs There is a good example, [`Mem KV Store`](https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/store/mod.rs), that demonstrates what should be done when a method is called. The list of [`RaftStorage`] methods is shown below. -Follow the link to method document to see the details. +Follow the link to method document to see the details. | Kind | [`RaftStorage`] method | Return value | Description | |------------|----------------------------------|------------------------------|---------------------------------------| @@ -98,14 +99,14 @@ Follow the link to method document to see the details. | Snapshot: | [`begin_receiving_snapshot()`] | `SnapshotData` | begin to install snapshot | | Snapshot: | [`install_snapshot()`] | () | install snapshot | | Snapshot: | [`get_current_snapshot()`] | [`Snapshot`] | get current snapshot | -| Snapshot: | [`get_snapshot_builder()`] | impl [`RaftSnapshotBuilder`] | get a snapshot builder | +| Snapshot: | [`get_snapshot_builder()`] | impl [`RaftSnapshotBuilder`] | get a snapshot builder | | | | ↳ [`build_snapshot()`] | build a snapshot from state machine | Most of the APIs are quite straightforward, except two indirect APIs: - Read logs: [`RaftStorage`] defines a method [`get_log_reader()`] to get log reader [`RaftLogReader`] : - + ```ignore trait RaftStorage { type LogReader: RaftLogReader; @@ -116,7 +117,7 @@ Most of the APIs are quite straightforward, except two indirect APIs: [`RaftLogReader`] defines the APIs to read logs, and is an also super trait of [`RaftStorage`] : - [`get_log_state()`] get latest log state from the storage; - [`try_get_log_entries()`] get log entries in a range; - + ```ignore trait RaftLogReader { async fn get_log_state(&mut self) -> Result, ...>; @@ -294,17 +295,17 @@ async fn main() { .wrap(Logger::new("%a %{User-Agent}i")) .wrap(middleware::Compress::default()) .app_data(app.clone()) - + // raft internal RPC .service(raft::append).service(raft::snapshot).service(raft::vote) - + // admin API .service(management::init) .service(management::add_learner) .service(management::change_membership) .service(management::metrics) .service(management::list_nodes) - + // application API .service(api::write).service(api::read) }) @@ -364,7 +365,7 @@ Additionally, two test scripts for setting up a cluster are available: [`RaftStorage`]: `crate::storage::RaftStorage` [`RaftStorage::LogReader`]: `crate::storage::RaftStorage::LogReader` [`RaftStorage::SnapshotBuilder`]: `crate::storage::RaftStorage::SnapshotBuilder` -[`get_log_reader()`]: `crate::storage::RaftStorage::get_log_reader` +[`get_log_reader()`]: `crate::storage::RaftStorage::get_log_reader` [`save_vote()`]: `crate::storage::RaftStorage::save_vote` [`read_vote()`]: `crate::storage::RaftStorage::read_vote` [`append_to_log()`]: `crate::storage::RaftStorage::append_to_log` diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 9012462e0..701246e18 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -1,7 +1,5 @@ use std::time::Duration; -use tokio::time::Instant; - use crate::core::ServerState; use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplaySlice; @@ -39,6 +37,8 @@ use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; use crate::summary::MessageSummary; use crate::validate::Valid; +use crate::AsyncRuntime; +use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; @@ -63,7 +63,7 @@ where C: RaftTypeConfig pub(crate) config: EngineConfig, /// The state of this raft node. - pub(crate) state: Valid>, + pub(crate) state: Valid::Instant>>, // TODO: add a Voting state as a container. /// Whether a greater log id is seen during election. @@ -73,7 +73,7 @@ where C: RaftTypeConfig pub(crate) seen_greater_log: bool, /// The internal server state used by Engine. - pub(crate) internal_server_state: InternalServerState, + pub(crate) internal_server_state: InternalServerState::Instant>, /// Output entry for the runtime. pub(crate) output: EngineOutput, @@ -82,7 +82,10 @@ where C: RaftTypeConfig impl Engine where C: RaftTypeConfig { - pub(crate) fn new(init_state: RaftState, config: EngineConfig) -> Self { + pub(crate) fn new( + init_state: RaftState::Instant>, + config: EngineConfig, + ) -> Self { Self { config, state: Valid::new(init_state), @@ -180,7 +183,10 @@ where C: RaftTypeConfig // Safe unwrap(): leading state is just created let leading = self.internal_server_state.leading_mut().unwrap(); - let voting = leading.initialize_voting(self.state.last_log_id().copied()); + let voting = leading.initialize_voting( + self.state.last_log_id().copied(), + ::Instant::now(), + ); let quorum_granted = voting.grant_by(&self.config.id); @@ -230,7 +236,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn handle_vote_req(&mut self, req: VoteRequest) -> VoteResponse { - let now = Instant::now(); + let now = ::Instant::now(); let lease = self.config.timer_config.leader_lease; let vote = self.state.vote_ref(); diff --git a/openraft/src/engine/handler/following_handler/append_entries_test.rs b/openraft/src/engine/handler/following_handler/append_entries_test.rs index 3acd153f3..ee0dd2d95 100644 --- a/openraft/src/engine/handler/following_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/append_entries_test.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use maplit::btreeset; -use tokio::time::Instant; use crate::engine::testing::UTConfig; use crate::engine::Engine; @@ -11,6 +10,7 @@ use crate::testing::log_id1; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -26,7 +26,7 @@ fn eng() -> Engine { eng.state.enable_validate = false; // Disable validation for incomplete state eng.config.id = 2; - eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1)); + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); eng.state.log_ids.append(log_id1(1, 1)); eng.state.log_ids.append(log_id1(2, 3)); eng.state.membership_state = MembershipState::new( diff --git a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs index 0ad778b0f..924e98f10 100644 --- a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs +++ b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::core::sm; use crate::engine::testing::UTConfig; @@ -15,6 +14,7 @@ use crate::EffectiveMembership; use crate::Membership; use crate::SnapshotMeta; use crate::StoredMembership; +use crate::TokioInstant; use crate::Vote; fn m12() -> Membership { @@ -29,7 +29,7 @@ fn eng() -> Engine { let mut eng = Engine::default(); eng.state.enable_validate = false; // Disable validation for incomplete state - eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1)); + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); eng.state.committed = Some(log_id1(4, 5)); eng.state.log_ids = LogIdList::new(vec![ // @@ -172,7 +172,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { let mut eng = Engine::::default(); eng.state.enable_validate = false; // Disable validation for incomplete state - eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1)); + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); eng.state.committed = Some(log_id1(2, 3)); eng.state.log_ids = LogIdList::new(vec![ // diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index b8b6d0540..043306a24 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -16,6 +16,7 @@ use crate::error::SnapshotMismatch; use crate::raft::InstallSnapshotRequest; use crate::raft_state::LogStateReader; use crate::raft_state::StreamingState; +use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; @@ -42,7 +43,7 @@ pub(crate) struct FollowingHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) state: &'x mut RaftState, + pub(crate) state: &'x mut RaftState::Instant>, pub(crate) output: &'x mut EngineOutput, } @@ -342,7 +343,9 @@ where C: RaftTypeConfig } // Do install: - // 1. Truncate all logs if conflict + // + // 1. Truncate all logs if conflict. + // // Unlike normal append-entries RPC, if conflicting logs are found, it is not // **necessary** to delete them. But cleaning them make the assumption of // incremental-log-id always hold, which makes it easier to debug. See: [Snapshot-replication](https://datafuselabs.github.io/openraft/replication.html#snapshot-replication) diff --git a/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs b/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs index deaaec675..41ad512ec 100644 --- a/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs +++ b/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs @@ -1,6 +1,5 @@ use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::core::sm; use crate::engine::testing::UTConfig; @@ -15,6 +14,7 @@ use crate::Membership; use crate::SnapshotMeta; use crate::SnapshotSegmentId; use crate::StoredMembership; +use crate::TokioInstant; use crate::Vote; fn m1234() -> Membership { @@ -25,7 +25,7 @@ fn eng() -> Engine { let mut eng = Engine::default(); eng.state.enable_validate = false; // Disable validation for incomplete state - eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1)); + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); eng.state.server_state = eng.calc_server_state(); eng diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index 58cbeb79b..df31a0e9d 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -4,7 +4,6 @@ use maplit::btreeset; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; #[allow(unused_imports)] use pretty_assertions::assert_str_eq; -use tokio::time::Instant; use crate::engine::testing::UTConfig; use crate::engine::Command; @@ -23,6 +22,7 @@ use crate::Entry; use crate::LogId; use crate::Membership; use crate::MembershipState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -48,7 +48,7 @@ fn eng() -> Engine { eng.config.id = 1; eng.state.committed = Some(log_id1(0, 0)); - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(3, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1)); eng.state.log_ids.append(log_id1(1, 1)); eng.state.log_ids.append(log_id1(2, 3)); eng.state.membership_state = MembershipState::new( diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 72ceb8361..61a50188d 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -7,6 +7,7 @@ use crate::engine::EngineOutput; use crate::entry::RaftPayload; use crate::internal_server_state::LeaderQuorumSet; use crate::leader::Leading; +use crate::AsyncRuntime; use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; @@ -23,8 +24,9 @@ pub(crate) struct LeaderHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) leader: &'x mut Leading>, - pub(crate) state: &'x mut RaftState, + pub(crate) leader: + &'x mut Leading, ::Instant>, + pub(crate) state: &'x mut RaftState::Instant>, pub(crate) output: &'x mut EngineOutput, } diff --git a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs index 774d28472..dfbcf9a6e 100644 --- a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs +++ b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs @@ -4,7 +4,6 @@ use maplit::btreeset; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; #[allow(unused_imports)] use pretty_assertions::assert_str_eq; -use tokio::time::Instant; use crate::engine::testing::UTConfig; use crate::engine::Command; @@ -16,6 +15,7 @@ use crate::utime::UTime; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -32,7 +32,7 @@ fn eng() -> Engine { eng.config.id = 1; eng.state.committed = Some(log_id1(0, 0)); - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(3, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1)); eng.state.log_ids.append(log_id1(1, 1)); eng.state.log_ids.append(log_id1(2, 3)); eng.state.membership_state = MembershipState::new( diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index abbdaebe2..d3064c0b4 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -3,6 +3,7 @@ use crate::engine::EngineConfig; use crate::engine::EngineOutput; use crate::raft_state::LogStateReader; use crate::summary::MessageSummary; +use crate::AsyncRuntime; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftState; @@ -16,7 +17,7 @@ pub(crate) struct LogHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) state: &'x mut RaftState, + pub(crate) state: &'x mut RaftState::Instant>, pub(crate) output: &'x mut EngineOutput, } diff --git a/openraft/src/engine/handler/replication_handler/append_membership_test.rs b/openraft/src/engine/handler/replication_handler/append_membership_test.rs index 010e53b4b..3e0829a35 100644 --- a/openraft/src/engine/handler/replication_handler/append_membership_test.rs +++ b/openraft/src/engine/handler/replication_handler/append_membership_test.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use maplit::btreeset; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -18,6 +17,7 @@ use crate::EffectiveMembership; use crate::LogId; use crate::Membership; use crate::MembershipState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -47,7 +47,7 @@ fn eng() -> Engine { Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m01())), Arc::new(EffectiveMembership::new(Some(log_id1(2, 3)), m23())), ); - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(2, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(2, 2)); eng.state.server_state = eng.calc_server_state(); eng } @@ -57,7 +57,7 @@ fn test_leader_append_membership_for_leader() -> anyhow::Result<()> { let mut eng = eng(); eng.state.server_state = ServerState::Leader; // Make it a real leader: voted for itself and vote is committed. - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(2, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(2, 2)); eng.vote_handler().become_leading(); eng.replication_handler().append_membership(&log_id1(3, 4), &m34()); @@ -109,7 +109,7 @@ fn test_leader_append_membership_update_learner_process() -> anyhow::Result<()> eng.state.server_state = ServerState::Leader; // Make it a real leader: voted for itself and vote is committed. - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(2, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(2, 2)); eng.state .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(2, 3)), m23_45()))); diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 4eb641683..10f19b9dc 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -1,7 +1,5 @@ use std::ops::Deref; -use tokio::time::Instant; - use crate::display_ext::DisplayOptionExt; use crate::engine::handler::log_handler::LogHandler; use crate::engine::handler::snapshot_handler::SnapshotHandler; @@ -17,6 +15,7 @@ use crate::progress::Progress; use crate::raft_state::LogStateReader; use crate::replication::ReplicationResult; use crate::utime::UTime; +use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; @@ -41,8 +40,9 @@ pub(crate) struct ReplicationHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) leader: &'x mut Leading>, - pub(crate) state: &'x mut RaftState, + pub(crate) leader: + &'x mut Leading, ::Instant>, + pub(crate) state: &'x mut RaftState::Instant>, pub(crate) output: &'x mut EngineOutput, } @@ -140,7 +140,7 @@ where C: RaftTypeConfig &mut self, target: C::NodeId, request_id: u64, - result: UTime>, + result: UTime, ::Instant>, ) { let sending_time = result.utime().unwrap(); @@ -160,7 +160,11 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: Instant) { + pub(crate) fn update_leader_vote_clock( + &mut self, + node_id: C::NodeId, + t: ::Instant, + ) { tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!()); let granted = *self @@ -271,7 +275,7 @@ where C: RaftTypeConfig &mut self, target: C::NodeId, request_id: u64, - repl_res: Result>, String>, + repl_res: Result, ::Instant>, String>, ) { // TODO(2): test diff --git a/openraft/src/engine/handler/replication_handler/update_matching_test.rs b/openraft/src/engine/handler/replication_handler/update_matching_test.rs index 97b29015e..c21d620c5 100644 --- a/openraft/src/engine/handler/replication_handler/update_matching_test.rs +++ b/openraft/src/engine/handler/replication_handler/update_matching_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::engine::testing::UTConfig; use crate::engine::Command; @@ -15,6 +14,7 @@ use crate::utime::UTime; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -30,7 +30,7 @@ fn eng() -> Engine { eng.state.enable_validate = false; // Disable validation for incomplete state eng.config.id = 2; - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(2, 1)); eng.state.membership_state = MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m01())), Arc::new(EffectiveMembership::new(Some(log_id1(2, 3)), m123())), diff --git a/openraft/src/engine/handler/server_state_handler/mod.rs b/openraft/src/engine/handler/server_state_handler/mod.rs index 2ca9bb472..a604c99ed 100644 --- a/openraft/src/engine/handler/server_state_handler/mod.rs +++ b/openraft/src/engine/handler/server_state_handler/mod.rs @@ -1,6 +1,7 @@ use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::AsyncRuntime; use crate::RaftState; use crate::RaftTypeConfig; use crate::ServerState; @@ -12,7 +13,7 @@ pub(crate) struct ServerStateHandler<'st, C> where C: RaftTypeConfig { pub(crate) config: &'st EngineConfig, - pub(crate) state: &'st mut RaftState, + pub(crate) state: &'st mut RaftState::Instant>, pub(crate) output: &'st mut EngineOutput, } diff --git a/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs b/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs index feda99bdc..103d11ea2 100644 --- a/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs +++ b/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::engine::testing::UTConfig; use crate::engine::Command; @@ -13,6 +12,7 @@ use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; use crate::ServerState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -28,7 +28,7 @@ fn eng() -> Engine { eng.state.enable_validate = false; // Disable validation for incomplete state eng.config.id = 2; - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(2, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(2, 2)); eng.state.membership_state = MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m01())), Arc::new(EffectiveMembership::new(Some(log_id1(2, 3)), m123())), @@ -48,7 +48,7 @@ fn test_update_server_state_if_changed() -> anyhow::Result<()> { assert_eq!(ServerState::Leader, ssh.state.server_state); ssh.output.clear_commands(); - ssh.state.vote = UTime::new(Instant::now(), Vote::new(2, 100)); + ssh.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 100)); ssh.update_server_state_if_changed(); assert_eq!(ServerState::Follower, ssh.state.server_state); diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 7c46ef2d0..3065b5e95 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -5,6 +5,7 @@ use crate::engine::Command; use crate::engine::EngineOutput; use crate::raft_state::LogStateReader; use crate::summary::MessageSummary; +use crate::AsyncRuntime; use crate::RaftState; use crate::RaftTypeConfig; use crate::SnapshotMeta; @@ -16,7 +17,7 @@ use crate::SnapshotMeta; pub(crate) struct SnapshotHandler<'st, 'out, C> where C: RaftTypeConfig { - pub(crate) state: &'st mut RaftState, + pub(crate) state: &'st mut RaftState::Instant>, pub(crate) output: &'out mut EngineOutput, } diff --git a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs index 2d50b03bd..fa318db3b 100644 --- a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; use tokio::sync::oneshot; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -16,6 +15,7 @@ use crate::testing::log_id1; use crate::utime::UTime; use crate::EffectiveMembership; use crate::Membership; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -36,7 +36,7 @@ fn eng() -> Engine { eng.state.enable_validate = false; // Disable validation for incomplete state eng.config.id = 0; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state.server_state = ServerState::Candidate; eng.state .membership_state diff --git a/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs b/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs index 55a939603..200e5cf28 100644 --- a/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -14,6 +13,7 @@ use crate::testing::log_id1; use crate::utime::UTime; use crate::EffectiveMembership; use crate::Membership; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -25,7 +25,7 @@ fn eng() -> Engine { eng.state.enable_validate = false; // Disable validation for incomplete state eng.config.id = 0; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state.server_state = ServerState::Candidate; eng.state .membership_state @@ -57,7 +57,7 @@ fn test_handle_message_vote_reject_smaller_vote() -> anyhow::Result<()> { fn test_handle_message_vote_committed_vote() -> anyhow::Result<()> { let mut eng = eng(); eng.state.log_ids = LogIdList::new(vec![log_id1(2, 3)]); - let now = Instant::now(); + let now = TokioInstant::now(); let resp = eng.vote_handler().update_vote(&Vote::new_committed(3, 2)); @@ -86,7 +86,7 @@ fn test_handle_message_vote_granted_equal_vote() -> anyhow::Result<()> { let mut eng = eng(); eng.state.log_ids = LogIdList::new(vec![log_id1(2, 3)]); - let now = Instant::now(); + let now = TokioInstant::now(); let resp = eng.vote_handler().update_vote(&Vote::new(2, 1)); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 791780236..7993e2242 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -1,7 +1,5 @@ use std::fmt::Debug; -use tokio::time::Instant; - use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::Command; use crate::engine::EngineConfig; @@ -15,6 +13,8 @@ use crate::progress::Progress; use crate::raft::ResultSender; use crate::raft_state::LogStateReader; use crate::utime::UTime; +use crate::AsyncRuntime; +use crate::Instant; use crate::RaftState; use crate::RaftTypeConfig; use crate::Vote; @@ -30,9 +30,10 @@ pub(crate) struct VoteHandler<'st, C> where C: RaftTypeConfig { pub(crate) config: &'st EngineConfig, - pub(crate) state: &'st mut RaftState, + pub(crate) state: &'st mut RaftState::Instant>, pub(crate) output: &'st mut EngineOutput, - pub(crate) internal_server_state: &'st mut InternalServerState, + pub(crate) internal_server_state: + &'st mut InternalServerState::Instant>, } impl<'st, C> VoteHandler<'st, C> @@ -57,7 +58,10 @@ where C: RaftTypeConfig T: Debug + Eq, E: Debug + Eq, Respond: From>>, - F: Fn(&RaftState, RejectVoteRequest) -> Result, + F: Fn( + &RaftState::Instant>, + RejectVoteRequest, + ) -> Result, { let vote_res = self.update_vote(vote); @@ -100,15 +104,19 @@ where C: RaftTypeConfig if vote > self.state.vote_ref() { tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote); - self.state.vote.update(Instant::now(), *vote); + self.state.vote.update(::Instant::now(), *vote); self.output.push_command(Command::SaveVote { vote: *vote }); } else { - self.state.vote.touch(Instant::now()); + self.state.vote.touch(::Instant::now()); } // Update vote related timer and lease. - tracing::debug!(now = debug(Instant::now()), "{}", func_name!()); + tracing::debug!( + now = debug(::Instant::now()), + "{}", + func_name!() + ); self.update_internal_server_state(); diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index a9ba2eef4..12b01ba87 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use crate::RaftTypeConfig; +use crate::TokioRuntime; /// Trivial Raft type config for Engine related unit test. #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)] @@ -13,4 +14,5 @@ impl RaftTypeConfig for UTConfig { type Node = (); type Entry = crate::Entry; type SnapshotData = Cursor>; + type AsyncRuntime = TokioRuntime; } diff --git a/openraft/src/engine/tests/append_entries_test.rs b/openraft/src/engine/tests/append_entries_test.rs index 86dc111d6..18f455808 100644 --- a/openraft/src/engine/tests/append_entries_test.rs +++ b/openraft/src/engine/tests/append_entries_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -18,6 +17,7 @@ use crate::EffectiveMembership; use crate::Entry; use crate::Membership; use crate::MembershipState; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -37,7 +37,7 @@ fn eng() -> Engine { eng.state.enable_validate = false; // Disable validation for incomplete state eng.config.id = 2; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state.log_ids.append(log_id1(1, 1)); eng.state.log_ids.append(log_id1(2, 3)); eng.state.committed = Some(log_id1(0, 0)); @@ -82,7 +82,7 @@ fn test_append_entries_vote_is_rejected() -> anyhow::Result<()> { fn test_append_entries_prev_log_id_is_applied() -> anyhow::Result<()> { // An applied log id has to be committed thus let mut eng = eng(); - eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(1, 2)); eng.vote_handler().become_leading(); let res = eng.append_entries( @@ -211,7 +211,7 @@ fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> { #[test] fn test_append_entries_prev_log_id_not_exists() -> anyhow::Result<()> { let mut eng = eng(); - eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(1, 2)); eng.vote_handler().become_leading(); let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id1(2, 4)), vec![ diff --git a/openraft/src/engine/tests/elect_test.rs b/openraft/src/engine/tests/elect_test.rs index 7e549e52b..1ad23443a 100644 --- a/openraft/src/engine/tests/elect_test.rs +++ b/openraft/src/engine/tests/elect_test.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -20,6 +19,7 @@ use crate::EffectiveMembership; use crate::Entry; use crate::LogId; use crate::Membership; +use crate::TokioInstant; use crate::Vote; fn m1() -> Membership { @@ -96,7 +96,7 @@ fn test_elect() -> anyhow::Result<()> { .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(0, 1)), m1()))); // Build in-progress election state - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(1, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(1, 2)); eng.vote_handler().become_leading(); eng.internal_server_state.voting_mut().map(|l| l.grant_by(&1)); diff --git a/openraft/src/engine/tests/handle_vote_req_test.rs b/openraft/src/engine/tests/handle_vote_req_test.rs index 85647795a..ba65258f7 100644 --- a/openraft/src/engine/tests/handle_vote_req_test.rs +++ b/openraft/src/engine/tests/handle_vote_req_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -15,6 +14,7 @@ use crate::testing::log_id1; use crate::utime::UTime; use crate::EffectiveMembership; use crate::Membership; +use crate::TokioInstant; use crate::Vote; fn m01() -> Membership { @@ -27,7 +27,7 @@ fn eng() -> Engine { eng.config.id = 1; // By default expire the leader lease so that the vote can be overridden in these tests. - eng.state.vote = UTime::new(Instant::now() - Duration::from_millis(300), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now() - Duration::from_millis(300), Vote::new(2, 1)); eng.state.server_state = ServerState::Candidate; eng.state .membership_state @@ -40,7 +40,7 @@ fn eng() -> Engine { #[test] fn test_handle_vote_req_rejected_by_leader_lease() -> anyhow::Result<()> { let mut eng = eng(); - eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1)); + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); let resp = eng.handle_vote_req(VoteRequest { vote: Vote::new(3, 2), diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index dd4ebaf26..97203078d 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -23,6 +22,7 @@ use crate::EffectiveMembership; use crate::Entry; use crate::LogId; use crate::Membership; +use crate::TokioInstant; use crate::Vote; fn m12() -> Membership { @@ -47,7 +47,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { { let mut eng = eng(); eng.state.server_state = ServerState::Follower; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12()))); @@ -70,7 +70,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { { let mut eng = eng(); eng.config.id = 1; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12()))); @@ -79,7 +79,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { let last_log_id = eng.state.last_log_id().copied(); eng.internal_server_state.leading_mut().map(|l| { - l.initialize_voting(last_log_id); + l.initialize_voting(last_log_id, TokioInstant::now()); l.voting_mut().unwrap().grant_by(&1) }); eng.state.server_state = ServerState::Candidate; @@ -106,7 +106,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { { let mut eng = eng(); eng.config.id = 1; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state.log_ids = LogIdList::new(vec![log_id1(3, 3)]); eng.state .membership_state @@ -116,7 +116,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { let last_log_id = eng.state.last_log_id().copied(); eng.internal_server_state.leading_mut().map(|l| { - l.initialize_voting(last_log_id); + l.initialize_voting(last_log_id, TokioInstant::now()); l.voting_mut().unwrap().grant_by(&1) }); eng.state.server_state = ServerState::Candidate; @@ -142,7 +142,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { { let mut eng = eng(); eng.config.id = 1; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12()))); @@ -151,7 +151,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { let last_log_id = eng.state.last_log_id().copied(); eng.internal_server_state.leading_mut().map(|l| { - l.initialize_voting(last_log_id); + l.initialize_voting(last_log_id, TokioInstant::now()); l.voting_mut().unwrap().grant_by(&1) }); @@ -178,7 +178,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { { let mut eng = eng(); eng.config.id = 1; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m1234()))); @@ -187,7 +187,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { let last_log_id = eng.state.last_log_id().copied(); eng.internal_server_state.leading_mut().map(|l| { - l.initialize_voting(last_log_id); + l.initialize_voting(last_log_id, TokioInstant::now()); l.voting_mut().unwrap().grant_by(&1) }); @@ -214,7 +214,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { { let mut eng = eng(); eng.config.id = 1; - eng.state.vote = UTime::new(Instant::now(), Vote::new(2, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 1)); eng.state .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12()))); @@ -223,7 +223,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { let last_log_id = eng.state.last_log_id().copied(); eng.internal_server_state.leading_mut().map(|l| { - l.initialize_voting(last_log_id); + l.initialize_voting(last_log_id, TokioInstant::now()); l.voting_mut().unwrap().grant_by(&1) }); diff --git a/openraft/src/engine/tests/initialize_test.rs b/openraft/src/engine/tests/initialize_test.rs index 87dfe4230..0ad5d4c9e 100644 --- a/openraft/src/engine/tests/initialize_test.rs +++ b/openraft/src/engine/tests/initialize_test.rs @@ -1,6 +1,5 @@ use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::time::Instant; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -20,6 +19,7 @@ use crate::vote::CommittedLeaderId; use crate::Entry; use crate::LogId; use crate::Membership; +use crate::TokioInstant; use crate::Vote; #[test] @@ -168,7 +168,7 @@ fn test_initialize() -> anyhow::Result<()> { tracing::info!("--- not allowed because of vote"); { let mut eng = eng(); - eng.state.vote = UTime::new(Instant::now(), Vote::new(0, 1)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(0, 1)); assert_eq!( Err(InitializeError::NotAllowed(NotAllowed { diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 872369a73..4b20ce942 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use maplit::btreeset; -use tokio::time::Instant; use crate::engine::testing::UTConfig; use crate::engine::Command; @@ -13,6 +12,7 @@ use crate::utime::UTime; use crate::EffectiveMembership; use crate::Membership; use crate::ServerState; +use crate::TokioInstant; use crate::Vote; fn m23() -> Membership { @@ -39,7 +39,7 @@ fn test_startup_as_leader() -> anyhow::Result<()> { .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(2, 3)), m23()))); // Committed vote makes it a leader at startup. - eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(1, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(1, 2)); eng.startup(); @@ -71,7 +71,7 @@ fn test_startup_candidate_becomes_follower() -> anyhow::Result<()> { .membership_state .set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(2, 3)), m23()))); // Non-committed vote makes it a candidate at startup. - eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new(1, 2)); eng.startup(); diff --git a/openraft/src/instant.rs b/openraft/src/instant.rs new file mode 100644 index 000000000..cfa4e6eb1 --- /dev/null +++ b/openraft/src/instant.rs @@ -0,0 +1,42 @@ +use std::fmt::Debug; +use std::ops::Add; +use std::ops::AddAssign; +use std::ops::Sub; +use std::ops::SubAssign; +use std::panic::RefUnwindSafe; +use std::panic::UnwindSafe; +use std::time::Duration; + +/// A measurement of a monotonically non-decreasing clock. +pub trait Instant: + Add + + AddAssign + + Clone + + Copy + + Debug + + Eq + + Ord + + PartialEq + + PartialOrd + + RefUnwindSafe + + Send + + Sub + + Sub + + SubAssign + + Sync + + Unpin + + UnwindSafe + + 'static +{ + /// Return the current instant. + fn now() -> Self; +} + +pub type TokioInstant = tokio::time::Instant; + +impl Instant for tokio::time::Instant { + #[inline] + fn now() -> Self { + tokio::time::Instant::now() + } +} diff --git a/openraft/src/internal_server_state.rs b/openraft/src/internal_server_state.rs index a58d879bc..d0fd40617 100644 --- a/openraft/src/internal_server_state.rs +++ b/openraft/src/internal_server_state.rs @@ -1,6 +1,7 @@ use crate::leader::voting::Voting; use crate::leader::Leading; use crate::quorum::Joint; +use crate::Instant; use crate::NodeId; /// The quorum set type used by `Leader`. @@ -22,13 +23,15 @@ pub(crate) type LeaderQuorumSet = Joint, Vec>>; #[derive(PartialEq, Eq)] #[allow(clippy::large_enum_variant)] // TODO(9): consider moving Leader to a Box -pub(crate) enum InternalServerState -where NID: NodeId +pub(crate) enum InternalServerState +where + NID: NodeId, + I: Instant, { /// Leader or candidate. /// /// `vote.committed==true` means it is a leader. - Leading(Leading>), + Leading(Leading, I>), /// Follower or learner. /// @@ -36,32 +39,36 @@ where NID: NodeId Following, } -impl Default for InternalServerState -where NID: NodeId +impl Default for InternalServerState +where + NID: NodeId, + I: Instant, { fn default() -> Self { Self::Following } } -impl InternalServerState -where NID: NodeId +impl InternalServerState +where + NID: NodeId, + I: Instant, { - pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting>> { + pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting, I>> { match self { InternalServerState::Leading(l) => l.voting_mut(), InternalServerState::Following => None, } } - pub(crate) fn leading(&self) -> Option<&Leading>> { + pub(crate) fn leading(&self) -> Option<&Leading, I>> { match self { InternalServerState::Leading(l) => Some(l), InternalServerState::Following => None, } } - pub(crate) fn leading_mut(&mut self) -> Option<&mut Leading>> { + pub(crate) fn leading_mut(&mut self) -> Option<&mut Leading, I>> { match self { InternalServerState::Leading(l) => Some(l), InternalServerState::Following => None, diff --git a/openraft/src/leader/leader.rs b/openraft/src/leader/leader.rs index 65fcd48aa..cc05c280a 100644 --- a/openraft/src/leader/leader.rs +++ b/openraft/src/leader/leader.rs @@ -1,13 +1,12 @@ use std::fmt; use std::ops::Deref; -use tokio::time::Instant; - use crate::leader::voting::Voting; use crate::progress::entry::ProgressEntry; use crate::progress::VecProgress; use crate::quorum::QuorumSet; use crate::utime::UTime; +use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::NodeId; @@ -29,16 +28,16 @@ use crate::Vote; /// But instead it will be able to upgrade its `leader_id` without losing leadership. #[derive(Clone, Debug)] #[derive(PartialEq, Eq)] -pub(crate) struct Leading> { +pub(crate) struct Leading, I: Instant> { // TODO(1): set the utime, // TODO(1): update it when heartbeat is granted by a quorum /// The vote this leader works in. - pub(crate) vote: UTime>, + pub(crate) vote: UTime, I>, quorum_set: QS, /// Voting state, i.e., there is a Candidate running. - voting: Option>, + voting: Option>, /// Tracks the replication progress and committed index pub(crate) progress: VecProgress, Option>, QS>, @@ -48,13 +47,14 @@ pub(crate) struct Leading> { /// See [`docs::leader_lease`] for more details. /// /// [`docs::leader_lease`]: `crate::docs::protocol::replication::leader_lease` - pub(crate) clock_progress: VecProgress, Option, QS>, + pub(crate) clock_progress: VecProgress, Option, QS>, } -impl Leading +impl Leading where NID: NodeId, QS: QuorumSet + Clone + fmt::Debug + 'static, + I: Instant, { pub(crate) fn new( vote: Vote, @@ -78,18 +78,18 @@ where } #[allow(dead_code)] - pub(crate) fn voting(&self) -> Option<&Voting> { + pub(crate) fn voting(&self) -> Option<&Voting> { self.voting.as_ref() } #[allow(dead_code)] - pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting> { + pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting> { self.voting.as_mut() } - pub(crate) fn initialize_voting(&mut self, last_log_id: Option>) -> &mut Voting { + pub(crate) fn initialize_voting(&mut self, last_log_id: Option>, now: I) -> &mut Voting { self.voting = Some(Voting::new( - Instant::now(), + now, *self.vote.deref(), last_log_id, self.quorum_set.clone(), @@ -98,7 +98,7 @@ where } /// Finish the voting process and return the state. - pub(crate) fn finish_voting(&mut self) -> Voting { + pub(crate) fn finish_voting(&mut self) -> Voting { // it has to be in voting progress self.voting.take().unwrap() } diff --git a/openraft/src/leader/voting.rs b/openraft/src/leader/voting.rs index c29051017..33fb0178b 100644 --- a/openraft/src/leader/voting.rs +++ b/openraft/src/leader/voting.rs @@ -1,11 +1,10 @@ use std::fmt; -use tokio::time::Instant; - use crate::display_ext::DisplayOptionExt; use crate::progress::Progress; use crate::progress::VecProgress; use crate::quorum::QuorumSet; +use crate::Instant; use crate::LogId; use crate::NodeId; use crate::Vote; @@ -13,13 +12,14 @@ use crate::Vote; /// Voting state. #[derive(Clone, Debug)] #[derive(PartialEq, Eq)] -pub(crate) struct Voting +pub(crate) struct Voting where NID: NodeId, QS: QuorumSet, + I: Instant, { /// When the voting is started. - starting_time: Instant, + starting_time: I, /// The vote. vote: Vote, @@ -30,10 +30,11 @@ where progress: VecProgress, } -impl fmt::Display for Voting +impl fmt::Display for Voting where NID: NodeId, QS: QuorumSet + fmt::Debug + 'static, + I: Instant, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( @@ -47,17 +48,13 @@ where } } -impl Voting +impl Voting where NID: NodeId, QS: QuorumSet + fmt::Debug + 'static, + I: Instant, { - pub(crate) fn new( - starting_time: Instant, - vote: Vote, - last_log_id: Option>, - quorum_set: QS, - ) -> Self { + pub(crate) fn new(starting_time: I, vote: Vote, last_log_id: Option>, quorum_set: QS) -> Self { Self { starting_time, vote, diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index ae80e6ba1..3476679b4 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -52,8 +52,10 @@ mod vote; #[cfg(feature = "compat")] pub mod compat; #[cfg(feature = "compat-07")] pub use or07; +pub mod async_runtime; pub mod entry; pub mod error; +pub mod instant; pub mod log_id; pub mod metrics; pub mod network; @@ -85,6 +87,8 @@ pub use network::RPCTypes; pub use network::RaftNetwork; pub use network::RaftNetworkFactory; +pub use crate::async_runtime::AsyncRuntime; +pub use crate::async_runtime::TokioRuntime; pub use crate::change_members::ChangeMembers; pub use crate::config::Config; pub use crate::config::ConfigError; @@ -92,6 +96,8 @@ pub use crate::config::SnapshotPolicy; pub use crate::core::ServerState; pub use crate::entry::Entry; pub use crate::entry::EntryPayload; +pub use crate::instant::Instant; +pub use crate::instant::TokioInstant; pub use crate::log_id::LogId; pub use crate::log_id::LogIdOptionExt; pub use crate::log_id::LogIndexOptionExt; diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index b8a73d54a..3f155cadf 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -2,12 +2,13 @@ use core::time::Duration; use std::collections::BTreeSet; use tokio::sync::watch; -use tokio::time::Instant; use crate::core::ServerState; use crate::display_ext::DisplayOption; use crate::metrics::RaftMetrics; use crate::node::Node; +use crate::AsyncRuntime; +use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::MessageSummary; @@ -26,25 +27,28 @@ pub enum WaitError { /// Wait is a wrapper of RaftMetrics channel that impls several utils to wait for metrics to satisfy /// some condition. -pub struct Wait +pub struct Wait where NID: NodeId, N: Node, + A: AsyncRuntime, { pub timeout: Duration, pub rx: watch::Receiver>, + pub(crate) _phantom: std::marker::PhantomData, } -impl Wait +impl Wait where NID: NodeId, N: Node, + A: AsyncRuntime, { /// Wait for metrics to satisfy some condition or timeout. #[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))] pub async fn metrics(&self, func: T, msg: impl ToString) -> Result, WaitError> where T: Fn(&RaftMetrics) -> bool + Send { - let timeout_at = Instant::now() + self.timeout; + let timeout_at = A::Instant::now() + self.timeout; let mut rx = self.rx.clone(); loop { @@ -67,7 +71,7 @@ where return Ok(latest); } - let now = Instant::now(); + let now = A::Instant::now(); if now >= timeout_at { return Err(WaitError::Timeout( self.timeout, @@ -77,7 +81,7 @@ where let sleep_time = timeout_at - now; tracing::debug!(?sleep_time, "wait timeout"); - let delay = tokio::time::sleep(sleep_time); + let delay = A::sleep(sleep_time); tokio::select! { _ = delay => { diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 0be9320da..af8e8ec95 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -18,6 +18,7 @@ use crate::Node; use crate::NodeId; use crate::RaftMetrics; use crate::StoredMembership; +use crate::TokioRuntime; use crate::Vote; /// Test wait for different state changes @@ -213,7 +214,11 @@ async fn test_wait_purged() -> anyhow::Result<()> { Ok(()) } -pub(crate) type InitResult = (RaftMetrics, Wait, watch::Sender>); +pub(crate) type InitResult = ( + RaftMetrics, + Wait, + watch::Sender>, +); /// Build a initial state for testing of Wait: /// Returns init metrics, Wait, and the tx to send an updated metrics. @@ -242,6 +247,7 @@ where let w = Wait { timeout: Duration::from_millis(100), rx, + _phantom: std::marker::PhantomData, }; (init, w, tx) diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index ed7d0411b..3c662683b 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -16,8 +16,6 @@ use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; use tokio::sync::Mutex; -use tokio::task::JoinError; -use tokio::task::JoinHandle; use tracing::trace_span; use tracing::Instrument; use tracing::Level; @@ -52,6 +50,7 @@ use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::AppData; use crate::AppDataResponse; +use crate::AsyncRuntime; use crate::ChangeMembers; use crate::LogId; use crate::LogIdOptionExt; @@ -105,6 +104,9 @@ pub trait RaftTypeConfig: /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) /// for details on where and how this is used. type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; + + /// Asynchronous runtime type. + type AsyncRuntime: AsyncRuntime; } /// Define types for a Raft type configuration. @@ -140,11 +142,13 @@ macro_rules! declare_raft_types { } /// The running state of RaftCore -enum CoreState -where NID: NodeId +enum CoreState +where + NID: NodeId, + A: AsyncRuntime, { /// The RaftCore task is still running. - Running(JoinHandle>>), + Running(A::JoinHandle>>), /// The RaftCore task has finished. The return value of the task is stored. Done(Result<(), Fatal>), @@ -159,13 +163,13 @@ where id: C::NodeId, config: Arc, runtime_config: Arc, - tick_handle: TickHandle, + tick_handle: TickHandle, tx_api: mpsc::UnboundedSender>, rx_metrics: watch::Receiver>, // TODO(xp): it does not need to be a async mutex. #[allow(clippy::type_complexity)] tx_shutdown: Mutex>>, - core_state: Mutex>, + core_state: Mutex>, } /// The Raft API. @@ -310,7 +314,7 @@ where _p: Default::default(), }; - let core_handle = tokio::spawn(core.main(rx_shutdown).instrument(trace_span!("spawn").or_current())); + let core_handle = C::AsyncRuntime::spawn(core.main(rx_shutdown).instrument(trace_span!("spawn").or_current())); let inner = RaftInner { id, @@ -806,7 +810,7 @@ where let core_task_res = match res { Err(err) => { - if err.is_panic() { + if C::AsyncRuntime::is_panic(&err) { Err(Fatal::Panicked) } else { Err(Fatal::Stopped) @@ -835,7 +839,11 @@ where /// /// If the API channel is already closed (Raft is in shutdown), then the request functor is /// destroyed right away and not called at all. - pub fn external_request, &mut LS, &mut N) + Send + 'static>( + pub fn external_request< + F: FnOnce(&RaftState::Instant>, &mut LS, &mut N) + + Send + + 'static, + >( &self, req: F, ) { @@ -867,7 +875,7 @@ where /// // wait for raft state to become a follower /// r.wait(None).state(State::Follower, "state").await?; /// ``` - pub fn wait(&self, timeout: Option) -> Wait { + pub fn wait(&self, timeout: Option) -> Wait { let timeout = match timeout { Some(t) => t, None => Duration::from_secs(86400 * 365 * 100), @@ -875,13 +883,14 @@ where Wait { timeout, rx: self.inner.rx_metrics.clone(), + _phantom: PhantomData, } } /// Shutdown this Raft node. /// /// It sends a shutdown signal and waits until `RaftCore` returns. - pub async fn shutdown(&self) -> Result<(), JoinError> { + pub async fn shutdown(&self) -> Result<(), ::JoinError> { if let Some(tx) = self.inner.tx_shutdown.lock().await.take() { // A failure to send means the RaftCore is already shutdown. Continue to check the task // return value. @@ -960,7 +969,11 @@ where ExternalRequest { #[allow(clippy::type_complexity)] - req: Box, &mut LS, &mut N) + Send + 'static>, + req: Box< + dyn FnOnce(&RaftState::Instant>, &mut LS, &mut N) + + Send + + 'static, + >, }, ExternalCommand { diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 7ba493246..e14aafd24 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -1,8 +1,6 @@ use std::error::Error; use std::ops::Deref; -use tokio::time::Instant; - use crate::engine::LogIdList; use crate::entry::RaftEntry; use crate::equal; @@ -12,6 +10,7 @@ use crate::log_id::RaftLogId; use crate::node::Node; use crate::utime::UTime; use crate::validate::Validate; +use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::NodeId; @@ -46,15 +45,15 @@ pub(crate) use crate::raft_state::snapshot_streaming::StreamingState; /// A struct used to represent the raft state which a Raft node needs. #[derive(Clone, Debug)] -#[derive(Default)] #[derive(PartialEq, Eq)] -pub struct RaftState +pub struct RaftState where NID: NodeId, N: Node, + I: Instant, { /// The vote state of this node. - pub(crate) vote: UTime>, + pub(crate) vote: UTime, I>, /// The LogId of the last log committed(AKA applied) to the state machine. /// @@ -94,10 +93,34 @@ where pub(crate) purge_upto: Option>, } -impl LogStateReader for RaftState +impl Default for RaftState +where + NID: NodeId, + N: Node, + I: Instant, +{ + fn default() -> Self { + Self { + vote: UTime::default(), + committed: None, + purged_next: 0, + log_ids: LogIdList::default(), + membership_state: MembershipState::default(), + snapshot_meta: SnapshotMeta::default(), + server_state: ServerState::default(), + accepted: Accepted::default(), + io_state: IOState::default(), + snapshot_streaming: None, + purge_upto: None, + } + } +} + +impl LogStateReader for RaftState where NID: NodeId, N: Node, + I: Instant, { fn get_log_id(&self, index: u64) -> Option> { self.log_ids.get(index) @@ -135,20 +158,22 @@ where } } -impl VoteStateReader for RaftState +impl VoteStateReader for RaftState where NID: NodeId, N: Node, + I: Instant, { fn vote_ref(&self) -> &Vote { self.vote.deref() } } -impl Validate for RaftState +impl Validate for RaftState where NID: NodeId, N: Node, + I: Instant, { fn validate(&self) -> Result<(), Box> { if self.purged_next == 0 { @@ -175,10 +200,11 @@ where } } -impl RaftState +impl RaftState where NID: NodeId, N: Node, + I: Instant, { /// Get a reference to the current vote. pub fn vote_ref(&self) -> &Vote { @@ -186,7 +212,7 @@ where } /// Return the last updated time of the vote. - pub fn vote_last_modified(&self) -> Option { + pub fn vote_last_modified(&self) -> Option { self.vote.utime() } diff --git a/openraft/src/raft_state/tests/forward_to_leader_test.rs b/openraft/src/raft_state/tests/forward_to_leader_test.rs index 9c5ab39cb..fb9794eda 100644 --- a/openraft/src/raft_state/tests/forward_to_leader_test.rs +++ b/openraft/src/raft_state/tests/forward_to_leader_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use maplit::btreemap; use maplit::btreeset; -use tokio::time::Instant; use crate::error::ForwardToLeader; use crate::utime::UTime; @@ -12,6 +11,7 @@ use crate::LogId; use crate::Membership; use crate::MembershipState; use crate::RaftState; +use crate::TokioInstant; use crate::Vote; fn log_id(term: u64, index: u64) -> LogId { @@ -28,7 +28,7 @@ fn m12() -> Membership { #[test] fn test_forward_to_leader_vote_not_committed() { let rs = RaftState { - vote: UTime::new(Instant::now(), Vote::new(1, 2)), + vote: UTime::new(TokioInstant::now(), Vote::new(1, 2)), membership_state: MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), @@ -42,7 +42,7 @@ fn test_forward_to_leader_vote_not_committed() { #[test] fn test_forward_to_leader_not_a_member() { let rs = RaftState { - vote: UTime::new(Instant::now(), Vote::new_committed(1, 3)), + vote: UTime::new(TokioInstant::now(), Vote::new_committed(1, 3)), membership_state: MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), @@ -58,7 +58,7 @@ fn test_forward_to_leader_has_leader() { let m123 = || Membership::::new(vec![btreeset! {1,2}], btreemap! {1=>4,2=>5,3=>6}); let rs = RaftState { - vote: UTime::new(Instant::now(), Vote::new_committed(1, 3)), + vote: UTime::new(TokioInstant::now(), Vote::new_committed(1, 3)), membership_state: MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m123())), Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m123())), diff --git a/openraft/src/raft_state/tests/log_state_reader_test.rs b/openraft/src/raft_state/tests/log_state_reader_test.rs index 31610b888..5a64471e9 100644 --- a/openraft/src/raft_state/tests/log_state_reader_test.rs +++ b/openraft/src/raft_state/tests/log_state_reader_test.rs @@ -3,6 +3,7 @@ use crate::raft_state::LogStateReader; use crate::CommittedLeaderId; use crate::LogId; use crate::RaftState; +use crate::TokioInstant; fn log_id(term: u64, index: u64) -> LogId { LogId:: { @@ -15,7 +16,7 @@ fn log_id(term: u64, index: u64) -> LogId { fn test_raft_state_prev_log_id() -> anyhow::Result<()> { // There is log id at 0 { - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(0, 0), log_id(1, 1), log_id(3, 4)]), ..Default::default() }; @@ -28,7 +29,7 @@ fn test_raft_state_prev_log_id() -> anyhow::Result<()> { // There is no log id at 0 { - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(1, 1), log_id(3, 4)]), ..Default::default() }; @@ -44,7 +45,7 @@ fn test_raft_state_prev_log_id() -> anyhow::Result<()> { #[test] fn test_raft_state_has_log_id_empty() -> anyhow::Result<()> { - let rs = RaftState::::default(); + let rs = RaftState::::default(); assert!(!rs.has_log_id(&log_id(0, 0))); @@ -53,7 +54,7 @@ fn test_raft_state_has_log_id_empty() -> anyhow::Result<()> { #[test] fn test_raft_state_has_log_id_committed_gets_true() -> anyhow::Result<()> { - let rs = RaftState:: { + let rs = RaftState:: { committed: Some(log_id(2, 1)), ..Default::default() }; @@ -67,7 +68,7 @@ fn test_raft_state_has_log_id_committed_gets_true() -> anyhow::Result<()> { #[test] fn test_raft_state_has_log_id_in_log_id_list() -> anyhow::Result<()> { - let rs = RaftState:: { + let rs = RaftState:: { committed: Some(log_id(2, 1)), log_ids: LogIdList::new(vec![log_id(1, 2), log_id(3, 4)]), ..Default::default() @@ -87,20 +88,20 @@ fn test_raft_state_has_log_id_in_log_id_list() -> anyhow::Result<()> { #[test] fn test_raft_state_last_log_id() -> anyhow::Result<()> { - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![]), ..Default::default() }; assert_eq!(None, rs.last_log_id()); - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(1, 2)]), ..Default::default() }; assert_eq!(Some(&log_id(1, 2)), rs.last_log_id()); - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(1, 2), log_id(3, 4)]), ..Default::default() }; @@ -111,7 +112,7 @@ fn test_raft_state_last_log_id() -> anyhow::Result<()> { #[test] fn test_raft_state_purge_upto() -> anyhow::Result<()> { - let rs = RaftState:: { + let rs = RaftState:: { purge_upto: Some(log_id(1, 2)), ..Default::default() }; @@ -123,21 +124,21 @@ fn test_raft_state_purge_upto() -> anyhow::Result<()> { #[test] fn test_raft_state_last_purged_log_id() -> anyhow::Result<()> { - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![]), ..Default::default() }; assert_eq!(None, rs.last_purged_log_id()); - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(1, 2)]), purged_next: 3, ..Default::default() }; assert_eq!(Some(log_id(1, 2)), rs.last_purged_log_id().copied()); - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(1, 2), log_id(3, 4)]), purged_next: 3, ..Default::default() diff --git a/openraft/src/raft_state/tests/validate_test.rs b/openraft/src/raft_state/tests/validate_test.rs index e085a8000..c3ddfc055 100644 --- a/openraft/src/raft_state/tests/validate_test.rs +++ b/openraft/src/raft_state/tests/validate_test.rs @@ -4,6 +4,7 @@ use crate::CommittedLeaderId; use crate::LogId; use crate::RaftState; use crate::SnapshotMeta; +use crate::TokioInstant; fn log_id(term: u64, index: u64) -> LogId { LogId:: { @@ -17,7 +18,7 @@ fn test_raft_state_validate_snapshot_is_none() -> anyhow::Result<()> { // Some app does not persist snapshot, when restarted, purged is not None but snapshot_last_log_id // is None. This is a valid state and should not emit error. - let rs = RaftState:: { + let rs = RaftState:: { log_ids: LogIdList::new(vec![log_id(1, 1), log_id(3, 4)]), purged_next: 2, purge_upto: Some(log_id(1, 1)), diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 3e94e5e51..5c31f9b00 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -6,6 +6,7 @@ mod response; use std::fmt; use std::io::SeekFrom; use std::sync::Arc; +use std::time::Duration; use anyerror::AnyError; use futures::future::FutureExt; @@ -16,11 +17,6 @@ use tokio::io::AsyncSeekExt; use tokio::select; use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use tokio::time::timeout; -use tokio::time::Duration; -use tokio::time::Instant; use tracing_futures::Instrument; use crate::config::Config; @@ -46,8 +42,10 @@ use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; use crate::storage::Snapshot; use crate::utime::UTime; +use crate::AsyncRuntime; use crate::ErrorSubject; use crate::ErrorVerb; +use crate::Instant; use crate::LogId; use crate::MessageSummary; use crate::NodeId; @@ -61,7 +59,7 @@ pub(crate) struct ReplicationHandle where C: RaftTypeConfig { /// The spawn handle the `ReplicationCore` task. - pub(crate) join_handle: JoinHandle>, + pub(crate) join_handle: ::JoinHandle>, /// The channel used for communicating with the replication task. pub(crate) tx_repl: mpsc::UnboundedSender>, @@ -160,7 +158,7 @@ where next_action: None, }; - let join_handle = tokio::spawn(this.main().instrument(span)); + let join_handle = C::AsyncRuntime::spawn(this.main().instrument(span)); ReplicationHandle { join_handle, tx_repl } } @@ -261,7 +259,7 @@ where Duration::from_millis(500) }); - self.backoff_drain_events(Instant::now() + duration).await?; + self.backoff_drain_events(::Instant::now() + duration).await?; } self.drain_events().await?; @@ -304,7 +302,7 @@ where logs }; - let leader_time = Instant::now(); + let leader_time = ::Instant::now(); // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { @@ -324,7 +322,7 @@ where let the_timeout = Duration::from_millis(self.config.heartbeat_interval); let option = RPCOption::new(the_timeout); - let res = timeout(the_timeout, self.network.append_entries(payload, option)).await; + let res = C::AsyncRuntime::timeout(the_timeout, self.network.append_entries(payload, option)).await; tracing::debug!("append_entries res: {:?}", res); @@ -413,7 +411,12 @@ where } } - fn update_conflicting(&mut self, request_id: Option, leader_time: Instant, conflict: LogId) { + fn update_conflicting( + &mut self, + request_id: Option, + leader_time: ::Instant, + conflict: LogId, + ) { tracing::debug!( target = display(self.target), request_id = display(request_id.display()), @@ -448,7 +451,7 @@ where fn update_matching( &mut self, request_id: Option, - leader_time: Instant, + leader_time: ::Instant, new_matching: Option>, ) { tracing::debug!( @@ -484,8 +487,11 @@ where /// In the backoff period, we should not send out any RPCs, but we should still receive events, /// in case the channel is closed, it should quit at once. #[tracing::instrument(level = "debug", skip(self))] - pub async fn backoff_drain_events(&mut self, until: Instant) -> Result<(), ReplicationClosed> { - let d = until - Instant::now(); + pub async fn backoff_drain_events( + &mut self, + until: ::Instant, + ) -> Result<(), ReplicationClosed> { + let d = until - ::Instant::now(); tracing::warn!( interval = debug(d), "{} backoff mode: drain events without processing them", @@ -493,8 +499,8 @@ where ); loop { - let sleep_duration = until - Instant::now(); - let sleep = sleep(sleep_duration); + let sleep_duration = until - ::Instant::now(); + let sleep = C::AsyncRuntime::sleep(sleep_duration); let recv = self.rx_repl.recv(); @@ -643,7 +649,7 @@ where snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(err_x)?; let n_read = snapshot.snapshot.read_buf(&mut buf).await.sto_res(err_x)?; - let leader_time = Instant::now(); + let leader_time = ::Instant::now(); let done = (offset + n_read as u64) == end; let req = InstallSnapshotRequest { @@ -672,7 +678,7 @@ where let option = RPCOption::new(snap_timeout); - let res = timeout(snap_timeout, self.network.install_snapshot(req, option)).await; + let res = C::AsyncRuntime::timeout(snap_timeout, self.network.install_snapshot(req, option)).await; let res = match res { Ok(outer_res) => match outer_res { @@ -686,7 +692,7 @@ where // Sleep a short time otherwise in test environment it is a dead-loop that // never yields. Because network implementation does // not yield. - sleep(Duration::from_millis(10)).await; + C::AsyncRuntime::sleep(Duration::from_millis(10)).await; continue; } }, @@ -699,7 +705,7 @@ where // Sleep a short time otherwise in test environment it is a dead-loop that never // yields. Because network implementation does not yield. - sleep(Duration::from_millis(10)).await; + C::AsyncRuntime::sleep(Duration::from_millis(10)).await; continue; } }; diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 7d36ea444..173083830 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,6 +1,7 @@ use crate::replication::ReplicationResult; use crate::replication::ReplicationSessionId; use crate::utime::UTime; +use crate::AsyncRuntime; use crate::MessageSummary; use crate::RaftTypeConfig; use crate::StorageError; @@ -32,7 +33,7 @@ where C: RaftTypeConfig /// the target node. /// /// The result also track the time when this request is sent. - result: Result>, String>, + result: Result, ::Instant>, String>, /// In which session this message is sent. /// diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 0643b8647..66be9b78e 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -1,8 +1,6 @@ use std::marker::PhantomData; use std::sync::Arc; -use tokio::time::Instant; - use crate::engine::LogIdList; use crate::entry::RaftPayload; use crate::log_id::RaftLogId; @@ -11,7 +9,9 @@ use crate::raft_state::LogIOId; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::utime::UTime; +use crate::AsyncRuntime; use crate::EffectiveMembership; +use crate::Instant; use crate::LogIdOptionExt; use crate::MembershipState; use crate::RaftSnapshotBuilder; @@ -57,7 +57,10 @@ where /// /// When the Raft node is first started, it will call this interface to fetch the last known /// state from stable storage. - pub async fn get_initial_state(&mut self) -> Result, StorageError> { + pub async fn get_initial_state( + &mut self, + ) -> Result::Instant>, StorageError> + { let vote = self.log_store.read_vote().await?; let vote = vote.unwrap_or_default(); @@ -97,7 +100,7 @@ where }; let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default(); - let now = Instant::now(); + let now = ::Instant::now(); Ok(RaftState { committed: last_applied, diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 4eb276368..b16c43096 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -23,6 +23,7 @@ use crate::testing::StoreBuilder; use crate::vote::CommittedLeaderId; use crate::AppData; use crate::AppDataResponse; +use crate::AsyncRuntime; use crate::LogId; use crate::Membership; use crate::NodeId; @@ -337,7 +338,7 @@ where pub async fn get_initial_state_without_init(mut store: LS, mut sm: SM) -> Result<(), StorageError> { let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; - let mut want = RaftState::default(); + let mut want = RaftState::::Instant>::default(); want.vote.update(initial.vote.utime().unwrap(), Vote::default()); assert_eq!(want, initial, "uninitialized state"); diff --git a/openraft/src/utime.rs b/openraft/src/utime.rs index 7a4ecab22..49268c7e6 100644 --- a/openraft/src/utime.rs +++ b/openraft/src/utime.rs @@ -2,16 +2,16 @@ use core::fmt; use std::ops::Deref; use std::ops::DerefMut; -use tokio::time::Instant; +use crate::Instant; /// Record the last update time for an object -#[derive(Debug, Default)] -pub(crate) struct UTime { +#[derive(Debug)] +pub(crate) struct UTime { data: T, - utime: Option, + utime: Option, } -impl fmt::Display for UTime { +impl fmt::Display for UTime { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.utime { Some(utime) => write!(f, "{}@{:?}", self.data, utime), @@ -20,7 +20,7 @@ impl fmt::Display for UTime { } } -impl Clone for UTime { +impl Clone for UTime { fn clone(&self) -> Self { Self { data: self.data.clone(), @@ -29,15 +29,24 @@ impl Clone for UTime { } } -impl PartialEq for UTime { +impl Default for UTime { + fn default() -> Self { + Self { + data: T::default(), + utime: None, + } + } +} + +impl PartialEq for UTime { fn eq(&self, other: &Self) -> bool { self.data == other.data && self.utime == other.utime } } -impl Eq for UTime {} +impl Eq for UTime {} -impl Deref for UTime { +impl Deref for UTime { type Target = T; fn deref(&self) -> &Self::Target { @@ -45,15 +54,15 @@ impl Deref for UTime { } } -impl DerefMut for UTime { +impl DerefMut for UTime { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.data } } -impl UTime { +impl UTime { /// Creates a new object that keeps track of the time when it was last updated. - pub(crate) fn new(now: Instant, data: T) -> Self { + pub(crate) fn new(now: I, data: T) -> Self { Self { data, utime: Some(now) } } @@ -63,7 +72,7 @@ impl UTime { } /// Return the last updated time of this object. - pub(crate) fn utime(&self) -> Option { + pub(crate) fn utime(&self) -> Option { self.utime } @@ -73,19 +82,19 @@ impl UTime { } /// Update the content of the object and the last updated time. - pub(crate) fn update(&mut self, now: Instant, data: T) { + pub(crate) fn update(&mut self, now: I, data: T) { self.data = data; self.utime = Some(now); } /// Update the last updated time. - pub(crate) fn touch(&mut self, now: Instant) { + pub(crate) fn touch(&mut self, now: I) { debug_assert!( Some(now) >= self.utime, "expect now: {:?}, must >= self.utime: {:?}, {:?}", now, self.utime, - self.utime.unwrap() - now + self.utime.unwrap() - now, ); self.utime = Some(now); } diff --git a/rocksstore-compat07/src/lib.rs b/rocksstore-compat07/src/lib.rs index 1b25559fc..e1826ef36 100644 --- a/rocksstore-compat07/src/lib.rs +++ b/rocksstore-compat07/src/lib.rs @@ -52,6 +52,7 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; +use openraft::TokioRuntime; use openraft::Vote; use rocksdb::ColumnFamily; use rocksdb::ColumnFamilyDescriptor; @@ -66,7 +67,7 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = EmptyNode, - Entry = Entry, SnapshotData = Cursor> + Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/rocksstore/src/lib.rs b/rocksstore/src/lib.rs index b3dceba80..94b06f79b 100644 --- a/rocksstore/src/lib.rs +++ b/rocksstore/src/lib.rs @@ -32,6 +32,7 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; +use openraft::TokioRuntime; use openraft::Vote; use rocksdb::ColumnFamily; use rocksdb::ColumnFamilyDescriptor; @@ -46,7 +47,7 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, - Entry = Entry, SnapshotData = Cursor> + Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); /** diff --git a/sledstore/src/lib.rs b/sledstore/src/lib.rs index e7237be61..12b76d217 100644 --- a/sledstore/src/lib.rs +++ b/sledstore/src/lib.rs @@ -31,6 +31,7 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; +use openraft::TokioRuntime; use openraft::Vote; use serde::Deserialize; use serde::Serialize; @@ -41,7 +42,7 @@ pub type ExampleNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, - Entry = Entry, SnapshotData = Cursor> + Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); /** diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs index 6f22f42be..ea5f45f5b 100644 --- a/stores/rocksstore-v2/src/lib.rs +++ b/stores/rocksstore-v2/src/lib.rs @@ -38,6 +38,7 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; +use openraft::TokioRuntime; use openraft::Vote; use rand::Rng; use rocksdb::ColumnFamily; @@ -53,7 +54,7 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration. pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, - Entry = Entry, SnapshotData = Cursor> + Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); /** diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index a1c393404..2c5e72f8f 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -3,9 +3,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; -use tokio::time::sleep; -use tokio::time::Instant; +use openraft::TokioRuntime; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -30,8 +30,8 @@ async fn enable_heartbeat() -> Result<()> { node0.enable_heartbeat(true); for _i in 0..3 { - let now = Instant::now(); - sleep(Duration::from_millis(500)).await; + let now = ::Instant::now(); + TokioRuntime::sleep(Duration::from_millis(500)).await; for node_id in [1, 2, 3] { // no new log will be sent, . diff --git a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs index b33ff35c6..d94c04f9a 100644 --- a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs +++ b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs @@ -7,9 +7,9 @@ use maplit::btreeset; use openraft::raft::VoteRequest; use openraft::testing::log_id1; use openraft::Config; +use openraft::TokioInstant; use openraft::Vote; use tokio::time::sleep; -use tokio::time::Instant; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -28,12 +28,12 @@ async fn heartbeat_reject_vote() -> Result<()> { ); let mut router = RaftRouter::new(config.clone()); - let now = Instant::now(); + let now = TokioInstant::now(); sleep(Duration::from_millis(1)).await; let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {3}).await?; - let vote_modified_time = Arc::new(Mutex::new(Some(Instant::now()))); + let vote_modified_time = Arc::new(Mutex::new(Some(TokioInstant::now()))); tracing::info!(log_index, "--- leader lease is set by heartbeat"); { let m = vote_modified_time.clone(); @@ -44,7 +44,7 @@ async fn heartbeat_reject_vote() -> Result<()> { assert!(state.vote_last_modified() > Some(now)); }); - let now = Instant::now(); + let now = TokioInstant::now(); sleep(Duration::from_millis(700)).await; let m = vote_modified_time.clone(); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index b5b1036c9..98daf5bb9 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -52,6 +52,8 @@ use openraft::RaftLogReader; use openraft::RaftMetrics; use openraft::RaftState; use openraft::ServerState; +use openraft::TokioInstant; +use openraft::TokioRuntime; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::ClientResponse; @@ -475,7 +477,7 @@ impl TypedRaftRouter { Ok(rst) } - pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { + pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { let node = { let rt = self.routing_table.lock().unwrap(); rt.get(node_id).expect("target node not found in routing table").clone().0 @@ -637,7 +639,7 @@ impl TypedRaftRouter { /// Send external request to the particular node. pub fn external_request< - F: FnOnce(&RaftState, &mut MemLogStore, &mut TypedRaftRouter) + Send + 'static, + F: FnOnce(&RaftState, &mut MemLogStore, &mut TypedRaftRouter) + Send + 'static, >( &self, target: MemNodeId,