Skip to content

Commit

Permalink
Feature: Raft::enable_tick() to enable or disable election timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 31, 2022
1 parent f62315b commit 86eb298
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 32 deletions.
1 change: 1 addition & 0 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub use server_state::ServerState;
pub(crate) use snapshot_state::SnapshotState;
pub(crate) use snapshot_state::SnapshotUpdate;
pub(crate) use tick::Tick;
pub(crate) use tick::TickHandle;
pub(crate) use tick::VoteWiseTime;
76 changes: 51 additions & 25 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! tick emitter emits a `RaftMsg::Tick` event at a certain interval.
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep_until;
use tokio::time::Instant;
use tracing::Level;
use tracing::Span;
use tracing_futures::Instrument;

use crate::raft::RaftMsg;
use crate::NodeId;
Expand Down Expand Up @@ -44,6 +43,7 @@ impl<NID: NodeId> VoteWiseTime<NID> {
}
}

/// Emit RaftMsg::Tick event at regular `interval`.
pub(crate) struct Tick<C, N, S>
where
C: RaftTypeConfig,
Expand All @@ -53,6 +53,13 @@ where
interval: Duration,

tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>,

/// Emit event or not
running: Arc<AtomicBool>,
}

pub(crate) struct TickHandle {
running: Arc<AtomicBool>,
}

impl<C, N, S> Tick<C, N, S>
Expand All @@ -61,27 +68,46 @@ where
N: RaftNetworkFactory<C>,
S: RaftStorage<C>,
{
pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>) -> JoinHandle<()> {
let t = Tick { interval, tx };

tokio::spawn(
async move {
let mut i = 0;
loop {
i += 1;

let at = Instant::now() + t.interval;
sleep_until(at).await;

let send_res = t.tx.send(RaftMsg::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Tick fails to send, receiving end quit.");
} else {
tracing::debug!("Tick sent: {}", i)
}
}
pub(crate) fn new(interval: Duration, tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>, enabled: bool) -> Self {
Tick {
interval,
running: Arc::new(AtomicBool::from(enabled)),
tx,
}
}

pub(crate) async fn tick_loop(self) {
let mut i = 0;
loop {
i += 1;

let at = Instant::now() + self.interval;
sleep_until(at).await;

if !self.running.load(Ordering::Relaxed) {
i -= 1;
continue;
}
.instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick")),
)

let send_res = self.tx.send(RaftMsg::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Tick fails to send, receiving end quit.");
} else {
tracing::debug!("Tick sent: {}", i)
}
}
}

/// Return a handle to control the ticker.
pub(crate) fn get_handle(&self) -> TickHandle {
TickHandle {
running: self.running.clone(),
}
}
}

impl TickHandle {
pub(crate) fn enable(&self, enabled: bool) {
self.running.store(enabled, Ordering::Relaxed);
}
}
23 changes: 22 additions & 1 deletion openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ use tokio::sync::watch;
use tokio::sync::Mutex;
use tokio::task::JoinError;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::config::Config;
use crate::core::replication_lag;
use crate::core::Expectation;
use crate::core::RaftCore;
use crate::core::SnapshotUpdate;
use crate::core::Tick;
use crate::core::TickHandle;
use crate::error::AddLearnerError;
use crate::error::AppendEntriesError;
use crate::error::CheckIsLeaderError;
Expand Down Expand Up @@ -124,6 +127,7 @@ enum CoreState<NID: NodeId> {
struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
id: C::NodeId,
config: Arc<Config>,
tick_handle: TickHandle,
tx_api: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
rx_metrics: watch::Receiver<RaftMetrics<C::NodeId>>,
// TODO(xp): it does not need to be a async mutex.
Expand Down Expand Up @@ -184,7 +188,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_shutdown, rx_shutdown) = oneshot::channel();

let _tick_handle = Tick::spawn(Duration::from_millis(config.heartbeat_interval * 3 / 2), tx_api.clone());
let tick = Tick::new(
Duration::from_millis(config.heartbeat_interval * 3 / 2),
tx_api.clone(),
true,
);

let tick_handle = tick.get_handle();
let _tick_join_handle =
tokio::spawn(tick.tick_loop().instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick")));

let core_handle = RaftCore::spawn(
id,
Expand All @@ -200,6 +212,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
let inner = RaftInner {
id,
config,
tick_handle,
tx_api,
rx_metrics,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
Expand All @@ -210,6 +223,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
Self { inner: Arc::new(inner) }
}

/// Enable or disable raft internal ticker.
///
/// The internal ticker triggers all timeout based event, e.g. election event or heartbeat event.
/// By disabling the ticker, a follower will not enter candidate again, a leader will not send heartbeat.
pub fn enable_tick(&self, enabled: bool) {
self.inner.tick_handle.enable(enabled);
}

/// Submit an AppendEntries RPC to this Raft node.
///
/// These RPCs are sent by the cluster leader to replicate log entries (§5.3), and are also
Expand Down
7 changes: 1 addition & 6 deletions openraft/tests/membership/t30_step_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,11 @@ async fn step_down() -> Result<()> {
);
let mut router = RaftRouter::new(config.clone());

let mut log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {}).await?;
let mut log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {2,3}).await?;

// Submit a config change which adds two new nodes and removes the current leader.
let orig_leader = router.leader().expect("expected the cluster to have a leader");
assert_eq!(0, orig_leader, "expected original leader to be node 0");
router.new_raft_node(2);
router.new_raft_node(3);
router.add_learner(0, 2).await?;
router.add_learner(0, 3).await?;
log_index += 2;
router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?;

let node = router.get_raft_handle(&orig_leader)?;
Expand Down

0 comments on commit 86eb298

Please sign in to comment.