Skip to content

Commit

Permalink
Change: do not store learners in Membership
Browse files Browse the repository at this point in the history
- Change: rename NodeIdNotInNodes to LackNodeInfo for letting it
  represent a more generalized error type.

- Change: remove struct `EitherNodesOrIds`, add trait `IntoOptionNodes` to
  convert types into internal type to store node infos.

  Because we do not need to store multiple types, but only need to
  convert them to our type.

- Change: Membership remove redundent field `learners`: the node ids that
  are in `Membership.nodes` but not in `Membership.configs` are
  learners.

  Some method signatures changed along:

    - Change: `EffectiveMembership.get_nodes()` and
      `Membership.get_nodes()` returns `&BTreeMap` instead of
      `Option<BTreeMap>`.

    - Change: `EffectiveMembership.get_node()` use `&node_id` instead of
      `node_id` for arg.

- Add: `Membership::with_nodes(configs, nodes)` to create a new instance with membership configs and optional node infos.

- fix: #225

commit-id:e0ff8a75
  • Loading branch information
drmingdrmer committed Mar 7, 2022
1 parent 6eeb930 commit 650e235
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 323 deletions.
10 changes: 5 additions & 5 deletions example-raft-kv/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ async fn test_cluster() -> anyhow::Result<()> {

let nodes_in_cluster = x.membership_config.get_nodes();
assert_eq!(
Some(&btreemap! {
1 => Node::new("127.0.0.1:21001"),
2 => Node::new("127.0.0.1:21002"),
3 => Node::new("127.0.0.1:21003"),
}),
&btreemap! {
1 => Some(Node::new("127.0.0.1:21001")),
2 => Some(Node::new("127.0.0.1:21002")),
3 => Some(Node::new("127.0.0.1:21003")),
},
nodes_in_cluster
);

Expand Down
20 changes: 10 additions & 10 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::option::Option::None;
use std::sync::Arc;
Expand All @@ -14,11 +15,10 @@ use crate::error::ClientWriteError;
use crate::error::EmptyMembership;
use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LackNodeInfo;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::NodeIdNotInNodes;
use crate::leader_metrics::RemoveTarget;
use crate::membership::EitherNodesOrIds;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::EntryPayload;
Expand All @@ -38,7 +38,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_init_with_config(
&mut self,
members: EitherNodesOrIds<C>,
members: BTreeMap<C::NodeId, Option<Node>>,
) -> Result<(), InitializeError<C>> {
// TODO(xp): simplify this condition

Expand All @@ -49,17 +49,17 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
return Err(InitializeError::NotAllowed);
}

let node_ids = members.node_ids();
let node_ids = members.keys().cloned().collect::<BTreeSet<C::NodeId>>();

if !node_ids.contains(&self.core.id) {
let e = NodeIdNotInNodes {
let e = LackNodeInfo {
node_id: self.core.id,
node_ids,
reason: "can not be initialized: it is not a member".to_string(),
};
return Err(InitializeError::NodeNotInCluster(e));
return Err(InitializeError::LackNodeInfo(e));
}

let membership = Membership::new(vec![node_ids], None).set_nodes(members.nodes())?;
let membership = Membership::with_nodes(vec![node_ids], members)?;

let payload = EntryPayload::Membership(membership.clone());
let _ent = self.core.append_payload_to_log(payload).await?;
Expand All @@ -77,7 +77,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
&mut self,
target: C::NodeId,
node: Option<Node>,
) -> Result<bool, NodeIdNotInNodes<C>> {
) -> Result<bool, LackNodeInfo<C>> {
tracing::debug!(
"add_learner_into_membership target node {:?} into learner {:?}",
target,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
match res {
Ok(x) => x,
Err(e) => {
let change_err = ChangeMembershipError::NodeNotInCluster(e);
let change_err = ChangeMembershipError::LackNodeInfo(e);
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(change_err)));
return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

let my_id = self.core.id;
let target = *target;
let target_node = self.core.effective_membership.get_node(target).cloned();
let target_node = self.core.effective_membership.get_node(&target).cloned();
let mut network = self.core.network.connect(target, target_node.as_ref()).await;

let ttl = Duration::from_millis(self.core.config.heartbeat_interval);
Expand Down
11 changes: 6 additions & 5 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ impl<C: RaftTypeConfig> EffectiveMembership<C> {
&self.all_members
}

pub(crate) fn all_learners(&self) -> &BTreeSet<C::NodeId> {
// TODO(xp): make it an iter
pub(crate) fn all_learners(&self) -> BTreeSet<C::NodeId> {
self.membership.all_learners()
}

Expand All @@ -127,12 +128,12 @@ impl<C: RaftTypeConfig> EffectiveMembership<C> {
self.membership.get_configs()
}

pub fn get_node(&self, node_id: C::NodeId) -> Option<&Node> {
pub fn get_node(&self, node_id: &C::NodeId) -> Option<&Node> {
self.membership.get_node(node_id)
}

pub fn get_nodes(&self) -> Option<&BTreeMap<C::NodeId, Node>> {
self.membership.get_nodes().as_ref()
pub fn get_nodes(&self) -> &BTreeMap<C::NodeId, Option<Node>> {
self.membership.get_nodes()
}
}

Expand Down Expand Up @@ -607,7 +608,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<Node> {
match leader_id {
None => None,
Some(id) => self.effective_membership.get_node(id).cloned(),
Some(id) => self.effective_membership.get_node(&id).cloned(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
target: C::NodeId,
caller_tx: Option<RaftRespTx<AddLearnerResponse<C>, AddLearnerError<C>>>,
) -> ReplicationState<C> {
let target_node = self.core.effective_membership.get_node(target);
let target_node = self.core.effective_membership.get_node(&target);
let repl_stream = ReplicationStream::new::<N, S>(
target,
target_node.cloned(),
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida
for member in all_nodes.into_iter().filter(|member| member != &self.core.id) {
let rpc = VoteRequest::new(self.core.vote, self.core.last_log_id);

let target_node = self.core.effective_membership.get_node(member).cloned();
let target_node = self.core.effective_membership.get_node(&member).cloned();

let (mut network, tx_inner) = (
self.core.network.connect(member, target_node.as_ref()).await,
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub enum ChangeMembershipError<C: RaftTypeConfig> {
LearnerIsLagging(#[from] LearnerIsLagging<C>),

#[error(transparent)]
NodeNotInCluster(#[from] NodeIdNotInNodes<C>),
LackNodeInfo(#[from] LackNodeInfo<C>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
Expand All @@ -128,7 +128,7 @@ pub enum AddLearnerError<C: RaftTypeConfig> {
Exists(C::NodeId),

#[error(transparent)]
NodeNotInCluster(#[from] NodeIdNotInNodes<C>),
LackNodeInfo(#[from] LackNodeInfo<C>),

#[error(transparent)]
Fatal(#[from] Fatal<C>),
Expand All @@ -153,7 +153,7 @@ pub enum InitializeError<C: RaftTypeConfig> {
NotAllowed,

#[error(transparent)]
NodeNotInCluster(#[from] NodeIdNotInNodes<C>),
LackNodeInfo(#[from] LackNodeInfo<C>),

#[error(transparent)]
Fatal(#[from] Fatal<C>),
Expand Down Expand Up @@ -357,10 +357,10 @@ pub struct LearnerIsLagging<C: RaftTypeConfig> {
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("node {node_id} not found in cluster: {node_ids:?}")]
pub struct NodeIdNotInNodes<C: RaftTypeConfig> {
#[error("node {node_id} {reason}")]
pub struct LackNodeInfo<C: RaftTypeConfig> {
pub node_id: C::NodeId,
pub node_ids: BTreeSet<C::NodeId>,
pub reason: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
Expand Down
Loading

0 comments on commit 650e235

Please sign in to comment.