Skip to content

Commit

Permalink
Feature: add Raft::remove_learner()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 2, 2022
1 parent 56595ed commit 568ca47
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 7 deletions.
28 changes: 28 additions & 0 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::RemoveLearnerError;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::EntryPayload;
Expand Down Expand Up @@ -112,6 +113,33 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

/// Remove a node from the cluster.
#[tracing::instrument(level = "debug", skip(self, tx))]
pub(super) fn remove_learner(&mut self, target: NodeId, tx: RaftRespTx<(), RemoveLearnerError>) {
tracing::info!("remove_learner: target: {}", target);

// Ensure the node doesn't already exist in the current
// config, in the set of new nodes already being synced, or in the nodes being removed.
if target == self.core.id {
tracing::info!("target node is this node");
let _ = tx.send(Err(RemoveLearnerError::NotLearner(target)));
return;
}

if self.core.effective_membership.membership.contains(&target) {
let _ = tx.send(Err(RemoveLearnerError::NotLearner(target)));
return;
}

let removed = self.nodes.remove(&target);
if removed.is_none() {
let _ = tx.send(Err(RemoveLearnerError::NotExists(target)));
return;
}

let _ = tx.send(Ok(()));
}

#[tracing::instrument(level = "debug", skip(self, tx))]
pub(super) async fn change_membership(
&mut self,
Expand Down
12 changes: 12 additions & 0 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftMsg::AddLearner { id, tx, blocking } => {
self.add_learner(id, tx, blocking);
}
RaftMsg::RemoveLearner { id, tx } => {
self.remove_learner(id, tx);
}
RaftMsg::ChangeMembership { members, blocking, tx } => {
self.change_membership(members, blocking, tx).await?;
}
Expand Down Expand Up @@ -955,6 +958,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftMsg::AddLearner { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::RemoveLearner { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::ChangeMembership { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
Expand Down Expand Up @@ -1034,6 +1040,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftMsg::AddLearner { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::RemoveLearner { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::ChangeMembership { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
Expand Down Expand Up @@ -1108,6 +1117,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftMsg::AddLearner { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::RemoveLearner { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::ChangeMembership { tx, .. } => {
self.core.reject_with_forward_to_leader(tx);
}
Expand Down
17 changes: 15 additions & 2 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ pub enum ChangeMembershipError {
#[error(transparent)]
EmptyMembership(#[from] EmptyMembership),

// TODO(xp): 111 test it
#[error(transparent)]
LearnerNotFound(#[from] LearnerNotFound),

// TODO(xp): 111 test it
#[error(transparent)]
LearnerIsLagging(#[from] LearnerIsLagging),
}
Expand All @@ -124,6 +122,21 @@ pub enum AddLearnerError {
Fatal(#[from] Fatal),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
pub enum RemoveLearnerError {
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader),

#[error("node {0} is not a learner, but a voter")]
NotLearner(NodeId),

#[error("node {0} is not a learner or voter")]
NotExists(NodeId),

#[error(transparent)]
Fatal(#[from] Fatal),
}

/// The set of errors which may take place when initializing a pristine Raft node.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
pub enum InitializeError {
Expand Down
25 changes: 22 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::RemoveLearnerError;
use crate::error::VoteError;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
Expand Down Expand Up @@ -246,7 +247,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.call_core(RaftMsg::Initialize { members, tx }, rx).await
}

/// Synchronize a new Raft node, optionally, blocking until up-to-speed (§6).
/// Add a new Raft node as learner(does not vote), optionally, blocking until up-to-speed (§6).
///
/// - Add a node as learner into the cluster.
/// - Setup replication from leader to it.
Expand All @@ -257,7 +258,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
///
/// If blocking is false, this function returns at once as successfully setting up the replication.
///
/// If the node to add is already a voter or learner, it returns `RaftResponse::NoChange` at once.
/// If the node to add is already a voter or learner, it returns `AddLearnerError::Exist` at once.
#[tracing::instrument(level = "info", skip_all, fields(target=id))]
pub async fn add_learner(&self, id: NodeId, blocking: bool) -> Result<AddLearnerResponse, AddLearnerError> {
tracing::info!("add_learner: target: {}, blocking: {}", id, blocking);
Expand All @@ -266,6 +267,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.call_core(RaftMsg::AddLearner { id, blocking, tx }, rx).await
}

/// Remove a learner Raft node.
///
/// If the `target` node is not a learner(a voter), it returns `RemoveLearnerError::NotLearner` error.
/// If the `target` node does not exist, it returns `RemoveLearnerError::NotExists` error.
#[tracing::instrument(level = "info", skip_all, fields(target=id))]
pub async fn remove_learner(&self, id: NodeId) -> Result<(), RemoveLearnerError> {
tracing::info!("remove_learner: target: {}", id);

let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::RemoveLearner { id, tx }, rx).await
}

/// Propose a cluster configuration change.
///
/// If a node in the proposed config but is not yet a voter or learner, it first calls `add_learner` to setup
Expand Down Expand Up @@ -475,7 +488,6 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
members: BTreeSet<NodeId>,
tx: RaftRespTx<(), InitializeError>,
},
// TODO(xp): make tx a field of a struct
/// Request raft core to setup a new replication to a learner.
AddLearner {
id: NodeId,
Expand All @@ -486,6 +498,10 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
/// Send the log id when the replication becomes line-rate.
tx: RaftRespTx<AddLearnerResponse, AddLearnerError>,
},
RemoveLearner {
id: NodeId,
tx: RaftRespTx<(), RemoveLearnerError>,
},
ChangeMembership {
members: BTreeSet<NodeId>,
/// with blocking==false, respond to client a ChangeMembershipError::LearnerIsLagging error at once if a
Expand Down Expand Up @@ -527,6 +543,9 @@ where
RaftMsg::AddLearner { id, blocking, .. } => {
format!("AddLearner: id: {}, blocking: {}", id, blocking)
}
RaftMsg::RemoveLearner { id, .. } => {
format!("RemoveLearner: id: {}", id)
}
RaftMsg::ChangeMembership { members, blocking, .. } => {
format!("ChangeMembership: members: {:?}, blocking: {}", members, blocking)
}
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/membership/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod fixtures;

mod t00_learner_restart;
mod t10_add_learner;
mod t10_remove_learner;
mod t15_add_remove_follower;
mod t20_change_membership;
mod t25_elect_with_new_config;
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/membership/t10_add_learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,5 @@ async fn add_learner_non_blocking() -> Result<()> {
}

fn timeout() -> Option<Duration> {
Some(Duration::from_micros(500))
Some(Duration::from_millis(500))
}
75 changes: 75 additions & 0 deletions openraft/tests/membership/t10_remove_learner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::error::RemoveLearnerError;
use openraft::Config;

use crate::fixtures::RaftRouter;

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn remove_learner() -> Result<()> {
// - When learner is removed, logs should stop replicating to it.

let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(
Config {
replication_lag_threshold: 0,
max_applied_log_to_keep: 2000, // prevent snapshot
..Default::default()
}
.validate()?,
);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?;
let leader = router.get_raft_handle(&0).await?;

tracing::info!("--- sends log to leader and replicates to learner");
{
router.client_request_many(0, "foo", 10).await;
log_index += 10;

for node_id in [0, 1] {
router.wait(&node_id, timeout()).await?.log(Some(log_index), "before removing learner").await?;
}
}

let learner_log_index = log_index;

tracing::info!("--- remove learner");
{
leader.remove_learner(1).await?;
}

tracing::info!("--- sends log to leader and should not replicate to learner");
{
router.client_request_many(0, "foo", 10).await;
log_index += 10;

router.wait(&0, timeout()).await?.log(Some(log_index), "leader log after removing learner").await?;
router
.wait(&1, timeout())
.await?
.log(Some(learner_log_index), "learner log after removing learner")
.await?;
}

tracing::info!("--- remove non-learner");
{
let res = leader.remove_learner(0).await;
assert_eq!(Err(RemoveLearnerError::NotLearner(0)), res);

let res = leader.remove_learner(2).await;
assert_eq!(Err(RemoveLearnerError::NotExists(2)), res);
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}
2 changes: 1 addition & 1 deletion openraft/tests/membership/t20_change_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
}

fn timeout() -> Option<Duration> {
Some(Duration::from_micros(500))
Some(Duration::from_millis(500))
}

0 comments on commit 568ca47

Please sign in to comment.