Skip to content

Commit

Permalink
Change: Turn Node into a trait (#480)
Browse files Browse the repository at this point in the history
Structs that depend on `Node` now have to implement `trait Node`,  or use a predefined basic implementation `BasicNode`. E.g., `struct Membership` now has two type parameters: `impl<NID, N> Membership<NID, N> where N: Node, NID: NodeId`.








Signed-off-by: Heinz N. Gies <heinz@licenser.net>
  • Loading branch information
Licenser authored Aug 3, 2022
1 parent 0d024b3 commit e4b705c
Show file tree
Hide file tree
Showing 64 changed files with 1,014 additions and 555 deletions.
42 changes: 30 additions & 12 deletions examples/raft-kv-memstore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use openraft::error::RPCError;
use openraft::error::RemoteError;
use openraft::raft::AddLearnerResponse;
use openraft::raft::ClientWriteResponse;
use openraft::BasicNode;
use openraft::RaftMetrics;
use reqwest::Client;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -55,14 +56,17 @@ impl ExampleClient {
pub async fn write(
&self,
req: &ExampleRequest,
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
) -> Result<
ClientWriteResponse<ExampleTypeConfig>,
RPCError<ExampleNodeId, BasicNode, ClientWriteError<ExampleNodeId, BasicNode>>,
> {
self.send_rpc_to_leader("write", Some(req)).await
}

/// Read value by key, in an inconsistent mode.
///
/// This method may return stale value because it does not force to read on a legal leader.
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, Infallible>> {
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, BasicNode, Infallible>> {
self.do_send_rpc_to_leader("read", Some(req)).await
}

Expand All @@ -72,7 +76,7 @@ impl ExampleClient {
pub async fn consistent_read(
&self,
req: &String,
) -> Result<String, RPCError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId>>> {
) -> Result<String, RPCError<ExampleNodeId, BasicNode, CheckIsLeaderError<ExampleNodeId, BasicNode>>> {
self.do_send_rpc_to_leader("consistent_read", Some(req)).await
}

Expand All @@ -84,7 +88,9 @@ impl ExampleClient {
/// With a initialized cluster, new node can be added with [`write`].
/// Then setup replication with [`add_learner`].
/// Then make the new node a member with [`change_membership`].
pub async fn init(&self) -> Result<(), RPCError<ExampleNodeId, InitializeError<ExampleNodeId>>> {
pub async fn init(
&self,
) -> Result<(), RPCError<ExampleNodeId, BasicNode, InitializeError<ExampleNodeId, BasicNode>>> {
self.do_send_rpc_to_leader("init", Some(&Empty {})).await
}

Expand All @@ -94,7 +100,10 @@ impl ExampleClient {
pub async fn add_learner(
&self,
req: (ExampleNodeId, String),
) -> Result<AddLearnerResponse<ExampleNodeId>, RPCError<ExampleNodeId, AddLearnerError<ExampleNodeId>>> {
) -> Result<
AddLearnerResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, AddLearnerError<ExampleNodeId, BasicNode>>,
> {
self.send_rpc_to_leader("add-learner", Some(&req)).await
}

Expand All @@ -105,7 +114,10 @@ impl ExampleClient {
pub async fn change_membership(
&self,
req: &BTreeSet<ExampleNodeId>,
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
) -> Result<
ClientWriteResponse<ExampleTypeConfig>,
RPCError<ExampleNodeId, BasicNode, ClientWriteError<ExampleNodeId, BasicNode>>,
> {
self.send_rpc_to_leader("change-membership", Some(req)).await
}

Expand All @@ -114,7 +126,9 @@ impl ExampleClient {
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId>, RPCError<ExampleNodeId, Infallible>> {
pub async fn metrics(
&self,
) -> Result<RaftMetrics<ExampleNodeId, BasicNode>, RPCError<ExampleNodeId, BasicNode, Infallible>> {
self.do_send_rpc_to_leader("metrics", None::<&()>).await
}

Expand All @@ -129,7 +143,7 @@ impl ExampleClient {
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Expand Down Expand Up @@ -174,17 +188,21 @@ impl ExampleClient {
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned + TryInto<ForwardToLeader<ExampleNodeId>> + Clone,
Err: std::error::Error
+ Serialize
+ DeserializeOwned
+ TryInto<ForwardToLeader<ExampleNodeId, BasicNode>>
+ Clone,
{
// Retry at most 3 times to find a valid leader.
let mut n_retry = 3;

loop {
let res: Result<Resp, RPCError<ExampleNodeId, Err>> = self.do_send_rpc_to_leader(uri, req).await;
let res: Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>> = self.do_send_rpc_to_leader(uri, req).await;

let rpc_err = match res {
Ok(x) => return Ok(x),
Expand All @@ -193,7 +211,7 @@ impl ExampleClient {

if let RPCError::RemoteError(remote_err) = &rpc_err {
let forward_err_res =
<Err as TryInto<ForwardToLeader<ExampleNodeId>>>::try_into(remote_err.source.clone());
<Err as TryInto<ForwardToLeader<ExampleNodeId, BasicNode>>>::try_into(remote_err.source.clone());

if let Ok(ForwardToLeader {
leader_id: Some(leader_id),
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix_web::middleware::Logger;
use actix_web::web::Data;
use actix_web::App;
use actix_web::HttpServer;
use openraft::BasicNode;
use openraft::Config;
use openraft::Raft;

Expand All @@ -26,7 +27,7 @@ pub type ExampleNodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode
);

pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use actix_web::web::Data;
use actix_web::Responder;
use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::BasicNode;
use web::Json;

use crate::app::ExampleApp;
Expand Down Expand Up @@ -45,7 +46,7 @@ pub async fn consistent_read(app: Data<ExampleApp>, req: Json<String>) -> actix_
let key = req.0;
let value = state_machine.data.get(&key).cloned();

let res: Result<String, CheckIsLeaderError<ExampleNodeId>> = Ok(value.unwrap_or_default());
let res: Result<String, CheckIsLeaderError<ExampleNodeId, BasicNode>> = Ok(value.unwrap_or_default());
Ok(Json(res))
}
Err(e) => Ok(Json(Err(e))),
Expand Down
14 changes: 4 additions & 10 deletions examples/raft-kv-memstore/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use actix_web::web;
use actix_web::web::Data;
use actix_web::Responder;
use openraft::error::Infallible;
use openraft::Node;
use openraft::BasicNode;
use openraft::RaftMetrics;
use web::Json;

Expand All @@ -27,10 +27,7 @@ pub async fn add_learner(
req: Json<(ExampleNodeId, String)>,
) -> actix_web::Result<impl Responder> {
let node_id = req.0 .0;
let node = Node {
addr: req.0 .1.clone(),
..Default::default()
};
let node = BasicNode { addr: req.0 .1.clone() };
let res = app.raft.add_learner(node_id, Some(node), true).await;
Ok(Json(res))
}
Expand All @@ -49,10 +46,7 @@ pub async fn change_membership(
#[post("/init")]
pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
let mut nodes = BTreeMap::new();
nodes.insert(app.id, Node {
addr: app.addr.clone(),
data: Default::default(),
});
nodes.insert(app.id, BasicNode { addr: app.addr.clone() });
let res = app.raft.initialize(nodes).await;
Ok(Json(res))
}
Expand All @@ -62,6 +56,6 @@ pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
pub async fn metrics(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics<ExampleNodeId>, Infallible> = Ok(metrics);
let res: Result<RaftMetrics<ExampleNodeId, BasicNode>, Infallible> = Ok(metrics);
Ok(Json(res))
}
23 changes: 14 additions & 9 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use openraft::raft::InstallSnapshotRequest;
use openraft::raft::InstallSnapshotResponse;
use openraft::raft::VoteRequest;
use openraft::raft::VoteResponse;
use openraft::Node;
use openraft::BasicNode;
use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;
use serde::de::DeserializeOwned;
Expand All @@ -26,10 +26,10 @@ impl ExampleNetwork {
pub async fn send_rpc<Req, Resp, Err>(
&self,
target: ExampleNodeId,
target_node: Option<&Node>,
target_node: Option<&BasicNode>,
uri: &str,
req: Req,
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
where
Req: Serialize,
Err: std::error::Error + DeserializeOwned,
Expand Down Expand Up @@ -57,7 +57,7 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
async fn connect(
&mut self,
target: ExampleNodeId,
node: Option<&Node>,
node: Option<&BasicNode>,
) -> Result<Self::Network, Self::ConnectionError> {
Ok(ExampleNetworkConnection {
owner: ExampleNetwork {},
Expand All @@ -70,30 +70,35 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
pub struct ExampleNetworkConnection {
owner: ExampleNetwork,
target: ExampleNodeId,
target_node: Option<Node>,
target_node: Option<BasicNode>,
}

#[async_trait]
impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
async fn send_append_entries(
&mut self,
req: AppendEntriesRequest<ExampleTypeConfig>,
) -> Result<AppendEntriesResponse<ExampleNodeId>, RPCError<ExampleNodeId, AppendEntriesError<ExampleNodeId>>> {
) -> Result<
AppendEntriesResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, AppendEntriesError<ExampleNodeId>>,
> {
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-append", req).await
}

async fn send_install_snapshot(
&mut self,
req: InstallSnapshotRequest<ExampleTypeConfig>,
) -> Result<InstallSnapshotResponse<ExampleNodeId>, RPCError<ExampleNodeId, InstallSnapshotError<ExampleNodeId>>>
{
) -> Result<
InstallSnapshotResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, InstallSnapshotError<ExampleNodeId>>,
> {
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-snapshot", req).await
}

async fn send_vote(
&mut self,
req: VoteRequest<ExampleNodeId>,
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, VoteError<ExampleNodeId>>> {
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, VoteError<ExampleNodeId>>> {
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-vote", req).await
}
}
19 changes: 13 additions & 6 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use openraft::async_trait::async_trait;
use openraft::storage::LogState;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::BasicNode;
use openraft::EffectiveMembership;
use openraft::Entry;
use openraft::EntryPayload;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub struct ExampleResponse {

#[derive(Debug)]
pub struct ExampleSnapshot {
pub meta: SnapshotMeta<ExampleNodeId>,
pub meta: SnapshotMeta<ExampleNodeId, BasicNode>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand All @@ -73,7 +74,7 @@ pub struct ExampleStateMachine {
pub last_applied_log: Option<LogId<ExampleNodeId>>,

// TODO: it should not be Option.
pub last_membership: EffectiveMembership<ExampleNodeId>,
pub last_membership: EffectiveMembership<ExampleNodeId, BasicNode>,

/// Application data.
pub data: BTreeMap<String, String>,
Expand Down Expand Up @@ -131,7 +132,7 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(
&mut self,
) -> Result<Snapshot<ExampleNodeId, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>> {
) -> Result<Snapshot<ExampleNodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -256,7 +257,13 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {

async fn last_applied_state(
&mut self,
) -> Result<(Option<LogId<ExampleNodeId>>, EffectiveMembership<ExampleNodeId>), StorageError<ExampleNodeId>> {
) -> Result<
(
Option<LogId<ExampleNodeId>>,
EffectiveMembership<ExampleNodeId, BasicNode>,
),
StorageError<ExampleNodeId>,
> {
let state_machine = self.state_machine.read().await;
Ok((state_machine.last_applied_log, state_machine.last_membership.clone()))
}
Expand Down Expand Up @@ -302,7 +309,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<ExampleNodeId>,
meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<ExampleTypeConfig>, StorageError<ExampleNodeId>> {
tracing::info!(
Expand Down Expand Up @@ -341,7 +348,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(
&mut self,
) -> Result<Option<Snapshot<ExampleNodeId, Self::SnapshotData>>, StorageError<ExampleNodeId>> {
) -> Result<Option<Snapshot<ExampleNodeId, BasicNode, Self::SnapshotData>>, StorageError<ExampleNodeId>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
10 changes: 5 additions & 5 deletions examples/raft-kv-memstore/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use maplit::btreemap;
use maplit::btreeset;
use openraft::error::NodeNotFound;
use openraft::AnyError;
use openraft::Node;
use openraft::BasicNode;
use tokio::runtime::Runtime;

/// Setup a cluster of 3 nodes.
Expand Down Expand Up @@ -90,9 +90,9 @@ async fn test_cluster() -> anyhow::Result<()> {
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
assert_eq!(
btreemap! {
1 => Some(Node::new("127.0.0.1:21001")),
2 => Some(Node::new("127.0.0.1:21002")),
3 => Some(Node::new("127.0.0.1:21003")),
1 => Some(BasicNode::new("127.0.0.1:21001")),
2 => Some(BasicNode::new("127.0.0.1:21002")),
3 => Some(BasicNode::new("127.0.0.1:21003")),
},
nodes_in_cluster
);
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn test_cluster() -> anyhow::Result<()> {
match x {
Err(e) => {
let s = e.to_string();
let expect_err:String = "error occur on remote peer 2: has to forward request to: Some(1), Some(Node { addr: \"127.0.0.1:21001\", data: {} })".to_string();
let expect_err:String = "error occur on remote peer 2: has to forward request to: Some(1), Some(BasicNode { addr: \"127.0.0.1:21001\" })".to_string();

assert_eq!(s, expect_err);
}
Expand Down
Loading

0 comments on commit e4b705c

Please sign in to comment.