Skip to content

Commit

Permalink
Change: RaftStorage::install_snapshot() does not need to return state…
Browse files Browse the repository at this point in the history
… changes

The caller of `RaftStorage::install_snapshot()` knows about what changes
have been made, the return value is unnecessary.
  • Loading branch information
drmingdrmer committed Aug 30, 2022
1 parent 6cd7f43 commit 3111e7e
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 49 deletions.
8 changes: 2 additions & 6 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
use openraft::SnapshotMeta;
use openraft::StateMachineChanges;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::Vote;
Expand Down Expand Up @@ -305,7 +304,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
&mut self,
meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<ExampleTypeConfig>, StorageError<ExampleNodeId>> {
) -> Result<(), StorageError<ExampleNodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
Expand Down Expand Up @@ -333,10 +332,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
// Update current snapshot.
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(new_snapshot);
Ok(StateMachineChanges {
last_applied: meta.last_log_id,
is_snapshot: true,
})
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
8 changes: 2 additions & 6 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
use openraft::SnapshotMeta;
use openraft::StateMachineChanges;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::Vote;
Expand Down Expand Up @@ -527,7 +526,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
&mut self,
meta: &SnapshotMeta<ExampleNodeId, ExampleNode>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<ExampleTypeConfig>, StorageError<ExampleNodeId>> {
) -> Result<(), StorageError<ExampleNodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
Expand All @@ -553,10 +552,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}

self.set_current_snapshot_(new_snapshot)?;
Ok(StateMachineChanges {
last_applied: meta.last_log_id,
is_snapshot: true,
})
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
8 changes: 2 additions & 6 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use openraft::LogId;
use openraft::RaftStorage;
use openraft::RaftStorageDebug;
use openraft::SnapshotMeta;
use openraft::StateMachineChanges;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::Vote;
Expand Down Expand Up @@ -358,7 +357,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
&mut self,
meta: &SnapshotMeta<MemNodeId, ()>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<Config>, StorageError<MemNodeId>> {
) -> Result<(), StorageError<MemNodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
Expand Down Expand Up @@ -392,10 +391,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
// Update current snapshot.
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(new_snapshot);
Ok(StateMachineChanges {
last_applied: meta.last_log_id,
is_snapshot: true,
})
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,9 +1685,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
let snapshot_data = self.received_snapshot.remove(&snapshot_meta.snapshot_id);

if let Some(data) = snapshot_data {
// TODO: `changes` is not used
let changes = self.storage.install_snapshot(snapshot_meta, data).await?;
tracing::debug!("update after install-snapshot: {:?}", changes);
self.storage.install_snapshot(snapshot_meta, data).await?;
tracing::debug!("Done install_snapshot, meta: {:?}", snapshot_meta);
} else {
unreachable!("buffered snapshot not found: snapshot meta: {:?}", snapshot_meta)
}
Expand Down
1 change: 0 additions & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ pub use crate::raft_types::LogIdOptionExt;
pub(crate) use crate::raft_types::MetricsChangeFlags;
pub use crate::raft_types::SnapshotId;
pub use crate::raft_types::SnapshotSegmentId;
pub use crate::raft_types::StateMachineChanges;
pub use crate::raft_types::Update;
pub use crate::storage::RaftLogReader;
pub use crate::storage::RaftSnapshotBuilder;
Expand Down
9 changes: 0 additions & 9 deletions openraft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::fmt::Formatter;
use crate::LeaderId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;

/// The identity of a raft log.
/// A term, node_id and an index identifies an log globally.
Expand Down Expand Up @@ -197,11 +196,3 @@ impl MetricsChangeFlags {
self.cluster = true
}
}

/// The changes of a state machine.
/// E.g. when applying a log to state machine, or installing a state machine from snapshot.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateMachineChanges<C: RaftTypeConfig> {
pub last_applied: Option<LogId<C::NodeId>>,
pub is_snapshot: bool,
}
5 changes: 2 additions & 3 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::defensive::check_range_matches_entries;
use crate::membership::EffectiveMembership;
use crate::node::Node;
use crate::raft_types::SnapshotId;
use crate::raft_types::StateMachineChanges;
use crate::Entry;
use crate::LogId;
use crate::MessageSummary;
Expand Down Expand Up @@ -283,7 +282,7 @@ where C: RaftTypeConfig
/// for details on log compaction / snapshotting.
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<C::NodeId>>;

/// Install a snapshot which has finished streaming from the cluster leader.
/// Install a snapshot which has finished streaming from the leader.
///
/// All other snapshots should be deleted at this point.
///
Expand All @@ -293,7 +292,7 @@ where C: RaftTypeConfig
&mut self,
meta: &SnapshotMeta<C::NodeId, C::Node>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<C>, StorageError<C::NodeId>>;
) -> Result<(), StorageError<C::NodeId>>;

/// Get a readable handle to the current snapshot, along with its metadata.
///
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/store_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::RaftStorage;
use crate::RaftStorageDebug;
use crate::RaftTypeConfig;
use crate::SnapshotMeta;
use crate::StateMachineChanges;
use crate::StorageError;
use crate::Vote;
use crate::Wrapper;
Expand Down Expand Up @@ -178,7 +177,7 @@ where
&mut self,
meta: &SnapshotMeta<C::NodeId, C::Node>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<C>, StorageError<C::NodeId>> {
) -> Result<(), StorageError<C::NodeId>> {
self.inner().install_snapshot(meta, snapshot).await
}

Expand Down
7 changes: 6 additions & 1 deletion openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use openraft::Config;
use openraft::LeaderId;
use openraft::LogId;
use openraft::RaftLogReader;
use tokio::time::sleep;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;
Expand Down Expand Up @@ -46,7 +47,7 @@ async fn purge_in_snapshot_logs() -> Result<()> {
assert_eq!(max_keep as usize, logs.len());
}

// Leader: .......15..20
// Leader: -------15..20
// Learner: 0..10
tracing::info!("--- block replication, build another snapshot");
{
Expand All @@ -62,6 +63,10 @@ async fn purge_in_snapshot_logs() -> Result<()> {
.await?;
}

// There may be a cached append-entries request that already loads log 10..15 from the store, just before building
// snapshot.
sleep(Duration::from_millis(500)).await;

tracing::info!("--- restore replication, install the 2nd snapshot on learner");
{
router.restore_node(1);
Expand Down
8 changes: 2 additions & 6 deletions rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
use openraft::SnapshotMeta;
use openraft::StateMachineChanges;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::Vote;
Expand Down Expand Up @@ -534,7 +533,7 @@ impl RaftStorage<Config> for Arc<RocksStore> {
&mut self,
meta: &SnapshotMeta<RocksNodeId, BasicNode>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<Config>, StorageError<RocksNodeId>> {
) -> Result<(), StorageError<RocksNodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
Expand All @@ -561,10 +560,7 @@ impl RaftStorage<Config> for Arc<RocksStore> {

self.put_meta::<meta::Snapshot>(&new_snapshot)?;

Ok(StateMachineChanges {
last_applied: meta.last_log_id,
is_snapshot: true,
})
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
8 changes: 2 additions & 6 deletions sledstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
use openraft::SnapshotMeta;
use openraft::StateMachineChanges;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::Vote;
Expand Down Expand Up @@ -627,7 +626,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<SledStore> {
&mut self,
meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges<ExampleTypeConfig>, StorageError<ExampleNodeId>> {
) -> Result<(), StorageError<ExampleNodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
Expand All @@ -653,10 +652,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<SledStore> {
}

self.set_current_snapshot_(new_snapshot).await?;
Ok(StateMachineChanges {
last_applied: meta.last_log_id,
is_snapshot: true,
})
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down

0 comments on commit 3111e7e

Please sign in to comment.