Skip to content

Commit

Permalink
Change: replace EffectiveMembership with StoredMembership in RaftStorage
Browse files Browse the repository at this point in the history
`EffectiveMembership` is a struct used at runtime, which contains
additional information such as an optimized `QuorumSet` implementation
that has different structure from a `Membership`.

To better separate concerns, a new struct called `StoredMembership` has
been introduced specifically for storage purpose. It contains only the
information that needs to be stored in storage. Therefore,
`StoredMembership` is used instead of `EffectiveMembership` in
RaftStorage.

Upgrade tip:

Replace `EffectiveMembership` with `StoredMembership` in an application.

Fields in `EffectiveMembership` are made private and can be accessed via
corresponding methods such as: `EffectiveMembership.log_id` and
`EffectiveMembership.membership` should be replaced with
`EffectiveMembership::log_id()` and `EffectiveMembership::membership()`.
  • Loading branch information
drmingdrmer committed Feb 26, 2023
1 parent 1cc2198 commit 0a1dd3d
Show file tree
Hide file tree
Showing 38 changed files with 351 additions and 261 deletions.
15 changes: 5 additions & 10 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use openraft::storage::LogState;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::BasicNode;
use openraft::EffectiveMembership;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorSubject;
Expand All @@ -22,6 +21,7 @@ use openraft::RaftStorage;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::StoredMembership;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct ExampleStateMachine {
pub last_applied_log: Option<LogId<ExampleNodeId>>,

// TODO: it should not be Option.
pub last_membership: EffectiveMembership<ExampleNodeId, BasicNode>,
pub last_membership: StoredMembership<ExampleNodeId, BasicNode>,

/// Application data.
pub data: BTreeMap<String, String>,
Expand Down Expand Up @@ -250,13 +250,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {

async fn last_applied_state(
&mut self,
) -> Result<
(
Option<LogId<ExampleNodeId>>,
EffectiveMembership<ExampleNodeId, BasicNode>,
),
StorageError<ExampleNodeId>,
> {
) -> Result<(Option<LogId<ExampleNodeId>>, StoredMembership<ExampleNodeId, BasicNode>), StorageError<ExampleNodeId>>
{
let state_machine = self.state_machine.read().await;
Ok((state_machine.last_applied_log, state_machine.last_membership.clone()))
}
Expand Down Expand Up @@ -286,7 +281,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}
},
EntryPayload::Membership(ref mem) => {
sm.last_membership = EffectiveMembership::new(Some(entry.log_id), mem.clone());
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
res.push(ExampleResponse { value: None })
}
};
Expand Down
9 changes: 6 additions & 3 deletions examples/raft-kv-memstore/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn test_cluster() -> anyhow::Result<()> {
println!("=== metrics after add-learner");
let x = client.metrics().await?;

assert_eq!(&vec![vec![1]], x.membership_config.get_joint_config());
assert_eq!(&vec![btreeset![1]], x.membership_config.membership().get_joint_config());

let nodes_in_cluster =
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
Expand Down Expand Up @@ -161,7 +161,10 @@ async fn test_cluster() -> anyhow::Result<()> {

println!("=== metrics after change-member");
let x = client.metrics().await?;
assert_eq!(&vec![vec![1, 2, 3]], x.membership_config.get_joint_config());
assert_eq!(
&vec![btreeset![1, 2, 3]],
x.membership_config.membership().get_joint_config()
);

// --- Try to write some application data through the leader.

Expand Down Expand Up @@ -246,7 +249,7 @@ async fn test_cluster() -> anyhow::Result<()> {

println!("=== metrics after change-membership to {{3}}");
let x = client.metrics().await?;
assert_eq!(&vec![vec![3]], x.membership_config.get_joint_config());
assert_eq!(&vec![btreeset![3]], x.membership_config.membership().get_joint_config());

println!("=== write `foo=zoo` to node-3");
let _x = client3
Expand Down
15 changes: 8 additions & 7 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use openraft::async_trait::async_trait;
use openraft::storage::LogState;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::EffectiveMembership;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorSubject;
Expand All @@ -26,6 +25,7 @@ use openraft::RaftStorage;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::StoredMembership;
use openraft::Vote;
use rocksdb::ColumnFamily;
use rocksdb::ColumnFamilyDescriptor;
Expand Down Expand Up @@ -82,7 +82,7 @@ pub struct SerializableExampleStateMachine {
pub last_applied_log: Option<LogId<ExampleNodeId>>,

// TODO: it should not be Option.
pub last_membership: EffectiveMembership<ExampleNodeId, ExampleNode>,
pub last_membership: StoredMembership<ExampleNodeId, ExampleNode>,

/// Application data.
pub data: BTreeMap<String, String>,
Expand Down Expand Up @@ -127,7 +127,7 @@ fn sm_w_err<E: Error + 'static>(e: E) -> StorageError<ExampleNodeId> {
}

impl ExampleStateMachine {
fn get_last_membership(&self) -> StorageResult<EffectiveMembership<ExampleNodeId, ExampleNode>> {
fn get_last_membership(&self) -> StorageResult<StoredMembership<ExampleNodeId, ExampleNode>> {
self.db
.get_cf(
self.db.cf_handle("state_machine").expect("cf_handle"),
Expand All @@ -137,10 +137,10 @@ impl ExampleStateMachine {
.and_then(|value| {
value
.map(|v| serde_json::from_slice(&v).map_err(sm_r_err))
.unwrap_or_else(|| Ok(EffectiveMembership::default()))
.unwrap_or_else(|| Ok(StoredMembership::default()))
})
}
fn set_last_membership(&self, membership: EffectiveMembership<ExampleNodeId, ExampleNode>) -> StorageResult<()> {
fn set_last_membership(&self, membership: StoredMembership<ExampleNodeId, ExampleNode>) -> StorageResult<()> {
self.db
.put_cf(
self.db.cf_handle("state_machine").expect("cf_handle"),
Expand Down Expand Up @@ -484,7 +484,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
) -> Result<
(
Option<LogId<ExampleNodeId>>,
EffectiveMembership<ExampleNodeId, ExampleNode>,
StoredMembership<ExampleNodeId, ExampleNode>,
),
StorageError<ExampleNodeId>,
> {
Expand Down Expand Up @@ -520,7 +520,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}
},
EntryPayload::Membership(ref mem) => {
sm.set_last_membership(EffectiveMembership::new(Some(entry.log_id), mem.clone()))?;
sm.set_last_membership(StoredMembership::new(Some(entry.log_id), mem.clone()))?;

res.push(ExampleResponse { value: None })
}
};
Expand Down
7 changes: 5 additions & 2 deletions examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
println!("=== metrics after add-learner");
let x = leader.metrics().await?;

assert_eq!(&vec![vec![1]], x.membership_config.get_joint_config());
assert_eq!(&vec![btreeset![1]], x.membership_config.membership().get_joint_config());

let nodes_in_cluster =
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
Expand Down Expand Up @@ -155,7 +155,10 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {

println!("=== metrics after change-member");
let x = leader.metrics().await?;
assert_eq!(&vec![vec![1, 2, 3]], x.membership_config.get_joint_config());
assert_eq!(
&vec![btreeset![1, 2, 3]],
x.membership_config.membership().get_joint_config()
);

// --- Try to write some application data through the leader.

Expand Down
8 changes: 4 additions & 4 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use openraft::storage::RaftLogReader;
use openraft::storage::RaftSnapshotBuilder;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::EffectiveMembership;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorSubject;
Expand All @@ -25,6 +24,7 @@ use openraft::RaftStorageDebug;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::StoredMembership;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -88,7 +88,7 @@ pub struct MemStoreSnapshot {
pub struct MemStoreStateMachine {
pub last_applied_log: Option<LogId<MemNodeId>>,

pub last_membership: EffectiveMembership<MemNodeId, ()>,
pub last_membership: StoredMembership<MemNodeId, ()>,

/// A mapping of client IDs to their state info.
pub client_serial_responses: HashMap<String, (u64, Option<String>)>,
Expand Down Expand Up @@ -259,7 +259,7 @@ impl RaftStorage<Config> for Arc<MemStore> {

async fn last_applied_state(
&mut self,
) -> Result<(Option<LogId<MemNodeId>>, EffectiveMembership<MemNodeId, ()>), StorageError<MemNodeId>> {
) -> Result<(Option<LogId<MemNodeId>>, StoredMembership<MemNodeId, ()>), StorageError<MemNodeId>> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
Expand Down Expand Up @@ -339,7 +339,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
res.push(ClientResponse(previous));
}
EntryPayload::Membership(ref mem) => {
sm.last_membership = EffectiveMembership::new(Some(entry.log_id), mem.clone());
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
res.push(ClientResponse(None))
}
};
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// request failures.

let _ = tx.send(Err(QuorumNotEnough {
cluster: self.engine.state.membership_state.effective().membership.summary(),
cluster: self.engine.state.membership_state.effective().membership().summary(),
got: granted,
}
.into()));
Expand Down Expand Up @@ -472,7 +472,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// --- cluster ---
state: self.engine.state.server_state,
current_leader: self.current_leader(),
membership_config: self.engine.state.membership_state.effective().clone(),
membership_config: self.engine.state.membership_state.effective().stored_membership().clone(),

// --- replication ---
replication,
Expand Down Expand Up @@ -773,10 +773,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap();

let membership_log_id = self.engine.state.membership_state.effective().log_id;
let membership_log_id = self.engine.state.membership_state.effective().log_id();
let network = self.network.new_client(target, target_node).await;

let session_id = ReplicationSessionId::new(*self.engine.state.get_vote(), membership_log_id);
let session_id = ReplicationSessionId::new(*self.engine.state.get_vote(), *membership_log_id);

ReplicationCore::<C, N, S>::spawn(
target,
Expand Down Expand Up @@ -1234,11 +1234,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
return false;
}

if session_id.membership_log_id != self.engine.state.membership_state.effective().log_id {
if &session_id.membership_log_id != self.engine.state.membership_state.effective().log_id() {
tracing::warn!(
"membership_log_id changed: msg sent by: {}; curr: {}; ignore when ({})",
session_id.membership_log_id.summary(),
self.engine.state.membership_state.effective().log_id.summary(),
self.engine.state.membership_state.effective().log_id().summary(),
msg
);
return false;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ where
);

#[allow(clippy::collapsible_if)]
if em.log_id.as_ref() <= self.state.committed() {
if em.log_id().as_ref() <= self.state.committed() {
if !em.is_voter(&self.config.id) && self.state.is_leading(&self.config.id) {
tracing::debug!("leader {} is stepping down", self.config.id);
self.vote_handler().become_following();
Expand Down
Loading

0 comments on commit 0a1dd3d

Please sign in to comment.