Skip to content

Commit

Permalink
Feature: leader lease
Browse files Browse the repository at this point in the history
The leader records the most recent time point when an RPC is initiated
towards a target node. The highest timestamp associated with RPCs made
to a quorum serves as the starting time point for a leader lease.

Improve: use tokio::Instant to replace TimeState

Use `Instant` for timekeeping instead of a custom `TimeState` struct.
Because multiple components need to generate timestamp, not only the
`RaftCore`, e.g., the `ReplicationCore`. And generating a timestamp is
not in the hot path, therefore caching it introduces unnecessary
complexity.
  • Loading branch information
drmingdrmer committed May 22, 2023
1 parent 2cabdae commit e9eed21
Show file tree
Hide file tree
Showing 24 changed files with 293 additions and 146 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct Store {
impl RaftLogReader<TypeConfig> for Arc<Store> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
let last = log.iter().next_back().map(|(_, ent)| ent.log_id);

let last_purged = *self.last_purged_log_id.read().await;

Expand Down
24 changes: 6 additions & 18 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::utime::UTime;
use crate::ChangeMembers;
use crate::LogId;
use crate::Membership;
Expand Down Expand Up @@ -237,9 +238,6 @@ where
async fn do_main(&mut self, rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!("raft node is initializing");

let now = Instant::now();
self.engine.timer.update_now(now);

self.engine.startup();
// It may not finish running all of the commands, if there is a command waiting for a callback.
self.run_engine_commands().await?;
Expand Down Expand Up @@ -472,18 +470,14 @@ where
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
tracing::debug!(now = debug(self.engine.timer.now()), "send_heartbeat");
tracing::debug!(now = debug(Instant::now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) =
self.engine.get_leader_handler_or_reject::<(), ClientWriteError<C::NodeId, C::Node>>(None)
{
lh
} else {
tracing::debug!(
now = debug(self.engine.timer.now()),
"{} failed to send heartbeat",
emitter
);
tracing::debug!(now = debug(Instant::now()), "{} failed to send heartbeat", emitter);
return false;
};

Expand Down Expand Up @@ -1067,10 +1061,7 @@ where
self.handle_append_entries_request(rpc, tx);
}
RaftMsg::RequestVote { rpc, tx } => {
// Vote request needs to check if the lease of the last leader expired.
// Thus it is time sensitive. Update the cached time for it.
let now = Instant::now();
self.engine.timer.update_now(now);
tracing::info!(
now = debug(now),
vote_request = display(rpc.summary()),
Expand Down Expand Up @@ -1155,7 +1146,6 @@ where
sender_vote: vote,
} => {
let now = Instant::now();
self.engine.timer.update_now(now);

tracing::info!(
now = debug(now),
Expand Down Expand Up @@ -1192,8 +1182,6 @@ where
// check every timer

let now = Instant::now();
// TODO: store server start time and use relative time
self.engine.timer.update_now(now);
tracing::debug!("received tick: {}, now: {:?}", i, now);

self.handle_tick_election();
Expand Down Expand Up @@ -1338,7 +1326,7 @@ where

#[tracing::instrument(level = "debug", skip_all)]
fn handle_tick_election(&mut self) {
let now = *self.engine.timer.now();
let now = Instant::now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);

Expand Down Expand Up @@ -1407,7 +1395,7 @@ where
&mut self,
target: C::NodeId,
id: u64,
result: Result<ReplicationResult<C::NodeId>, String>,
result: Result<UTime<ReplicationResult<C::NodeId>>, String>,
) {
tracing::debug!(
target = display(target),
Expand Down Expand Up @@ -1607,7 +1595,7 @@ where
.map_err(|e| StorageIOError::read_snapshot(None, AnyError::error(e)))?;

// unwrap: The replication channel must not be dropped or it is a bug.
node.tx_repl.send(Replicate::snapshot(id, rx)).map_err(|_e| {
node.tx_repl.send(Replicate::snapshot(Some(id), rx)).map_err(|_e| {
StorageIOError::read_snapshot(None, AnyError::error("replication channel closed"))
})?;
}
Expand Down
1 change: 1 addition & 0 deletions openraft/src/docs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ To learn about the data structures used in Openraft and the commit protocol, see
- [`Effective membership`](`data::effective_membership`) explains when membership config takes effect;
- [`protocol`] :
- [`replication`](`protocol::replication`);
- [`leader_lease`](`protocol::replication::leader_lease`);
- [`log_replication`](`protocol::replication::log_replication`);
- [`snapshot_replication`](`protocol::replication::snapshot_replication`);

Expand Down
29 changes: 29 additions & 0 deletions openraft/src/docs/protocol/leader_lease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Leader lease

Leader lease is a mechanism to prevent split-brain scenarios in which multiple leaders are elected in a cluster.

It is implemented by the leader sending heartbeats(append-entries and install-snapshot request
can be considered as heartbeat too) to the followers.
If the followers do not receive any heartbeats within a certain time period `lease`,
they will start an election to elect a new leader.

## The lease for leader and follower

The `lease` stored on the leader is smaller than the `lease` stored on a follower.
The leader must not believe it is still valid while the follower has already started an election.

## Extend the lease

- A follower will extend the lease to `now + lease + timeout` when it receives a valid heartbeat from the leader, i.e., `leader.vote >= local.vote` .

- A leader will extend the lease to `t + lease` when a quorum grants the leader at time `t`.

When a heartbeat RPC call succeeds,
it means the target node acknowledged the leader's clock time when the RPC was sent.

If a time point `t` is acknowledged by a quorum, the leader can be sure that no
other leader existed during the period `[t, t + lease]`. The leader can then extend its
local lease to `t + lease`.

The above `timeout` is the maximum time that can be taken by an operation that relies on the lease on the leader.

4 changes: 4 additions & 0 deletions openraft/src/docs/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
pub mod replication {
#![doc = include_str!("replication.md")]

pub mod leader_lease {
#![doc = include_str!("leader_lease.md")]
}

pub mod log_replication {
#![doc = include_str!("log_replication.md")]
}
Expand Down
10 changes: 2 additions & 8 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use crate::engine::handler::replication_handler::SendNone;
use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::handler::vote_handler::VoteHandler;
use crate::engine::time_state;
use crate::engine::time_state::TimeState;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineOutput;
Expand Down Expand Up @@ -72,8 +70,6 @@ where C: RaftTypeConfig
/// should be greater.
pub(crate) seen_greater_log: bool,

pub(crate) timer: TimeState,

/// The internal server state used by Engine.
pub(crate) internal_server_state: InternalServerState<C::NodeId>,

Expand All @@ -85,12 +81,11 @@ impl<C> Engine<C>
where C: RaftTypeConfig
{
pub(crate) fn new(init_state: RaftState<C::NodeId, C::Node>, config: EngineConfig<C::NodeId>) -> Self {
let now = Instant::now();
// let now = Instant::now();
Self {
config,
state: Valid::new(init_state),
seen_greater_log: false,
timer: time_state::TimeState::new(now),
internal_server_state: InternalServerState::default(),
output: EngineOutput::new(4096),
}
Expand Down Expand Up @@ -228,7 +223,7 @@ where C: RaftTypeConfig

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C::NodeId>) -> VoteResponse<C::NodeId> {
let now = *self.timer.now();
let now = Instant::now();
let lease = self.config.timer_config.leader_lease;
let vote = self.state.vote_ref();

Expand Down Expand Up @@ -637,7 +632,6 @@ where C: RaftTypeConfig
VoteHandler {
config: &self.config,
state: &mut self.state,
timer: &mut self.timer,
output: &mut self.output,
internal_server_state: &mut self.internal_server_state,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use maplit::btreeset;
use tokio::time::Instant;

use crate::engine::testing::UTConfig;
use crate::engine::Engine;
Expand All @@ -25,7 +26,7 @@ fn eng() -> Engine<UTConfig> {
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.config.id = 2;
eng.state.vote.update(*eng.timer.now(), Vote::new_committed(2, 1));
eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1));
eng.state.log_ids.append(log_id1(1, 1));
eng.state.log_ids.append(log_id1(2, 3));
eng.state.membership_state = MembershipState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use maplit::btreeset;
use pretty_assertions::assert_eq;
use tokio::time::Instant;

use crate::core::sm;
use crate::engine::testing::UTConfig;
Expand All @@ -28,7 +29,7 @@ fn eng() -> Engine<UTConfig> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.vote.update(*eng.timer.now(), Vote::new_committed(2, 1));
eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1));
eng.state.committed = Some(log_id1(4, 5));
eng.state.log_ids = LogIdList::new(vec![
//
Expand Down Expand Up @@ -171,7 +172,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
let mut eng = Engine::<UTConfig>::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.vote.update(*eng.timer.now(), Vote::new_committed(2, 1));
eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1));
eng.state.committed = Some(log_id1(2, 3));
eng.state.log_ids = LogIdList::new(vec![
//
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use maplit::btreeset;
use pretty_assertions::assert_eq;
use tokio::time::Instant;

use crate::core::sm;
use crate::engine::testing::UTConfig;
Expand All @@ -24,7 +25,7 @@ fn eng() -> Engine<UTConfig> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state

eng.state.vote.update(*eng.timer.now(), Vote::new_committed(2, 1));
eng.state.vote.update(Instant::now(), Vote::new_committed(2, 1));
eng.state.server_state = eng.calc_server_state();

eng
Expand Down
94 changes: 80 additions & 14 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::ops::Deref;

use tokio::time::Instant;

use crate::display_ext::DisplayOptionExt;
use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
Expand All @@ -14,6 +16,7 @@ use crate::progress::Inflight;
use crate::progress::Progress;
use crate::raft_state::LogStateReader;
use crate::replication::ReplicationResult;
use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::LogId;
use crate::LogIdOptionExt;
Expand Down Expand Up @@ -111,13 +114,81 @@ where C: RaftTypeConfig
pub(crate) fn rebuild_progresses(&mut self) {
let em = self.state.membership_state.effective();

let end = self.state.last_log_id().next_index();

let old_progress = self.leader.progress.clone();
let learner_ids = em.learner_ids().collect::<Vec<_>>();

self.leader.progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, ProgressEntry::empty(end));
{
let end = self.state.last_log_id().next_index();
let default_v = ProgressEntry::empty(end);

let old_progress = self.leader.progress.clone();

self.leader.progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, default_v);
}

{
let old_progress = self.leader.clock_progress.clone();

self.leader.clock_progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, None);
}
}

/// Update replication progress when a successful response is received.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_success_progress(
&mut self,
target: C::NodeId,
request_id: u64,
result: UTime<ReplicationResult<C::NodeId>>,
) {
let sending_time = result.utime().unwrap();

// No matter what the result is, the validity of the leader is granted by a follower.
self.update_leader_vote_clock(target, sending_time);

match result.into_inner() {
ReplicationResult::Matching(matching) => {
self.update_matching(target, request_id, matching);
}
ReplicationResult::Conflict(conflict) => {
self.update_conflicting(target, request_id, conflict);
}
}
}

/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: Instant) {
tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!());

let granted = *self
.leader
.clock_progress
.update(&node_id, Some(t))
.expect("it should always update existing progress");

tracing::debug!(
granted = debug(granted),
clock_progress = debug(&self.leader.clock_progress),
"granted leader vote clock after updating"
);

// When membership changes, the granted value may revert to a previous value.
// E.g.: when membership changes from 12345 to {12345,123}:
// ```
// Voters: 1 2 3 4 5
// Value: 1 1 2 2 2 // 2 is granted by a quorum
//
// Voters: 1 2 3 4 5
// 1 2 3
// Value: 1 1 2 2 2 // 1 is granted by a quorum
// ```
if granted > self.leader.vote.utime() {
// Safe unwrap(): Only Some() can be greater than another Option
self.leader.vote.touch(granted.unwrap());
}
}

/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
Expand Down Expand Up @@ -200,7 +271,7 @@ where C: RaftTypeConfig
&mut self,
target: C::NodeId,
request_id: u64,
repl_res: Result<ReplicationResult<C::NodeId>, String>,
repl_res: Result<UTime<ReplicationResult<C::NodeId>>, String>,
) {
// TODO(2): test

Expand All @@ -214,14 +285,9 @@ where C: RaftTypeConfig
);

match repl_res {
Ok(p) => match p {
ReplicationResult::Matching(matching) => {
self.update_matching(target, request_id, matching);
}
ReplicationResult::Conflict(conflict) => {
self.update_conflicting(target, request_id, conflict);
}
},
Ok(p) => {
self.update_success_progress(target, request_id, p);
}
Err(err_str) => {
tracing::warn!(
id = display(request_id),
Expand Down
Loading

0 comments on commit e9eed21

Please sign in to comment.