Skip to content

Commit

Permalink
Feature: Engine stores log ids
Browse files Browse the repository at this point in the history
- Feature: add trait `RaftLogId`: so that it is able to access the log
  id in any type that contains a log id.

- Feature: `RaftState` adds a field `log_ids` to track all existing log
  ids in the store. Thus an `Engine` do not need to access the
  `RaftStorage` to have a grasp about what logs there are.

  `Engine` needs this information to deal with log related events.

- Feature: add struct `LogIdList` that provides full information about
  all log ids but only stores several **key** log ids: it only stores
  one log id for every **leader**.

- Feture: in `RaftStorage::get_initial_state()`, it loads information
  about all log ids in a modified binary search algo.
  • Loading branch information
drmingdrmer committed Apr 22, 2022
1 parent 5fb62f4 commit 675a0f8
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 11 deletions.
2 changes: 1 addition & 1 deletion openraft/src/core/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use crate::core::ReplicationState;
use crate::core::ServerState;
use crate::engine::Command;
use crate::entry::EntryRef;
use crate::entry::RaftEntry;
use crate::error::ClientWriteError;
use crate::error::ExtractFatal;
use crate::error::Fatal;
use crate::metrics::ReplicationMetrics;
use crate::raft::ClientWriteResponse;
use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
use crate::raft_types::RaftLogId;
use crate::replication::ReplicaEvent;
use crate::runtime::RaftRuntime;
use crate::summary::MessageSummary;
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl<NID: NodeId> Engine<NID> {
self.check_initialize()?;

self.assign_log_ids(entries.iter_mut());
self.state.extend_log_ids_from_same_leader(entries);

self.commands.push(Command::AppendInputEntries { range: 0..l });
self.metrics_flags.set_data_changed();
Expand Down Expand Up @@ -168,6 +169,7 @@ impl<NID: NodeId> Engine<NID> {
}

self.assign_log_ids(entries.iter_mut());
self.state.extend_log_ids_from_same_leader(entries);

self.commands.push(Command::AppendInputEntries { range: 0..l });
self.metrics_flags.set_data_changed();
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/engine/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ fn test_initialize() -> anyhow::Result<()> {
eng.id = 1;

eng.initialize(&mut entries)?;

assert_eq!(Some(log_id0), eng.state.get_log_id(0));
assert_eq!(None, eng.state.get_log_id(1));
assert_eq!(Some(log_id0), eng.state.last_log_id);

assert_eq!(ServerState::Candidate, eng.state.server_state);
assert_eq!(
MetricsChangeFlags {
Expand All @@ -55,6 +59,7 @@ fn test_initialize() -> anyhow::Result<()> {
eng.metrics_flags
);
assert_eq!(m12(), eng.state.effective_membership.membership);

assert_eq!(
vec![
Command::AppendInputEntries { range: 0..1 },
Expand Down
105 changes: 105 additions & 0 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use crate::raft_types::RaftLogId;
use crate::LogId;
use crate::NodeId;

/// Efficient storage for log ids.
///
/// It stores only the ids of log that have a new leader_id. And the `last_log_id` at the end.
/// I.e., the oldest log id belonging to every leader.
///
/// If it is not empty, the first one is `last_purged_log_id` and the last one is `last_log_id`.
/// The last one may have the same leader id as the second last one.
#[derive(Default, Debug, Clone)]
#[derive(PartialEq, Eq)]
pub struct LogIdList<NID: NodeId> {
key_log_ids: Vec<LogId<NID>>,
}

impl<NID: NodeId> LogIdList<NID> {
pub fn new(key_log_ids: impl IntoIterator<Item = LogId<NID>>) -> Self {
Self {
key_log_ids: key_log_ids.into_iter().collect(),
}
}

/// Extends a list of `log_id` that are proposed by a same leader.
///
/// The log ids in the input has to be continuous.
pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId<NID> + 'a>(&mut self, new_ids: &[LID]) {
if let Some(first) = new_ids.first() {
let first_id = first.get_log_id();
self.append(*first_id);

if let Some(last) = new_ids.last() {
let last_id = last.get_log_id();
assert_eq!(last_id.leader_id, first_id.leader_id);

if last_id != first_id {
self.append(*last_id);
}
}
}
}

/// Append a new `log_id`.
///
/// The log id to append does not have to be the next to the last one in `key_log_ids`.
/// In such case, it is meant to append a list of log ids.
///
/// NOTE: The last two in `key_log_ids` may be with the same `leader_id`, because `last_log_id` always present in
/// `log_ids`.
pub(crate) fn append(&mut self, new_log_id: LogId<NID>) {
let l = self.key_log_ids.len();
if l == 0 {
self.key_log_ids.push(new_log_id);
return;
}

// l >= 1

assert!(new_log_id > self.key_log_ids[l - 1]);

if l == 1 {
self.key_log_ids.push(new_log_id);
return;
}

// l >= 2

let last = self.key_log_ids[l - 1];

if self.key_log_ids.get(l - 2).map(|x| x.leader_id) == Some(last.leader_id) {
// Replace the **last log id**.
self.key_log_ids[l - 1] = new_log_id;
return;
}

// The last one is an initial log entry of a leader.
// Add a **last log id** with the same leader id.

self.key_log_ids.push(new_log_id);
}

/// Get the log id at the specified index.
///
/// It will return `last_purged_log_id` if index is at the last purged index.
#[allow(dead_code)]
pub(crate) fn get(&self, index: u64) -> Option<LogId<NID>> {
let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index));

match res {
Ok(i) => Some(LogId::new(self.key_log_ids[i].leader_id, index)),
Err(i) => {
if i == 0 || i == self.key_log_ids.len() {
None
} else {
Some(LogId::new(self.key_log_ids[i - 1].leader_id, index))
}
}
}
}

pub(crate) fn key_log_ids(&self) -> &[LogId<NID>] {
&self.key_log_ids
}
}
110 changes: 110 additions & 0 deletions openraft/src/engine/log_id_list_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::engine::LogIdList;
use crate::LeaderId;
use crate::LogId;

#[test]
fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> {
let log_id = |t, i| LogId::<u64> {
leader_id: LeaderId { term: t, node_id: 1 },
index: i,
};

let mut ids = LogIdList::<u64>::default();

ids.extend_from_same_leader(&[log_id(1, 2)]);
assert_eq!(vec![log_id(1, 2)], ids.key_log_ids());

ids.extend_from_same_leader(&[
log_id(1, 3), //
log_id(1, 4),
]);
assert_eq!(
vec![
log_id(1, 2), //
log_id(1, 4)
],
ids.key_log_ids(),
"same leader as the last"
);

ids.extend_from_same_leader(&[
log_id(2, 5), //
log_id(2, 6),
log_id(2, 7),
]);
assert_eq!(
vec![
log_id(1, 2), //
log_id(2, 5),
log_id(2, 7)
],
ids.key_log_ids(),
"different leader as the last"
);

Ok(())
}

#[test]
fn test_log_id_list_append() -> anyhow::Result<()> {
let log_id = |t, i| LogId::<u64> {
leader_id: LeaderId { term: t, node_id: 1 },
index: i,
};

let mut ids = LogIdList::<u64>::default();

let cases = vec![
(log_id(1, 2), vec![log_id(1, 2)]), //
(log_id(1, 3), vec![log_id(1, 2), log_id(1, 3)]),
(log_id(1, 4), vec![log_id(1, 2), log_id(1, 4)]),
(log_id(2, 5), vec![log_id(1, 2), log_id(2, 5)]),
(log_id(2, 7), vec![log_id(1, 2), log_id(2, 5), log_id(2, 7)]),
(log_id(2, 9), vec![log_id(1, 2), log_id(2, 5), log_id(2, 9)]),
];

for (new_log_id, want) in cases {
ids.append(new_log_id);
assert_eq!(want, ids.key_log_ids());
}

Ok(())
}

#[test]
fn test_log_id_list_get_log_id() -> anyhow::Result<()> {
let log_id = |t, i| LogId::<u64> {
leader_id: LeaderId { term: t, node_id: 1 },
index: i,
};

let ids = LogIdList::<u64>::default();

assert!(ids.get(0).is_none());
assert!(ids.get(1).is_none());
assert!(ids.get(2).is_none());

let ids = LogIdList::<u64>::new(vec![
log_id(1, 1),
log_id(1, 2),
log_id(3, 3),
log_id(5, 6),
log_id(7, 8),
log_id(7, 10),
]);

assert_eq!(None, ids.get(0));
assert_eq!(Some(log_id(1, 1)), ids.get(1));
assert_eq!(Some(log_id(1, 2)), ids.get(2));
assert_eq!(Some(log_id(3, 3)), ids.get(3));
assert_eq!(Some(log_id(3, 4)), ids.get(4));
assert_eq!(Some(log_id(3, 5)), ids.get(5));
assert_eq!(Some(log_id(5, 6)), ids.get(6));
assert_eq!(Some(log_id(5, 7)), ids.get(7));
assert_eq!(Some(log_id(7, 8)), ids.get(8));
assert_eq!(Some(log_id(7, 9)), ids.get(9));
assert_eq!(Some(log_id(7, 10)), ids.get(10));
assert_eq!(None, ids.get(11));

Ok(())
}
4 changes: 4 additions & 0 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ mod engine;

#[cfg(test)]
mod initialize_test;
mod log_id_list;
#[cfg(test)]
mod log_id_list_test;

pub(crate) use engine::Command;
pub(crate) use engine::Engine;
pub use log_id_list::LogIdList;
15 changes: 8 additions & 7 deletions openraft/src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;

use crate::raft_types::RaftLogId;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
Expand All @@ -16,11 +17,7 @@ pub trait RaftPayload<NID: NodeId> {
}

/// Defines operations on an entry.
pub trait RaftEntry<NID: NodeId>: RaftPayload<NID> {
fn get_log_id(&self) -> &LogId<NID>;

fn set_log_id(&mut self, log_id: &LogId<NID>);
}
pub trait RaftEntry<NID: NodeId>: RaftPayload<NID> + RaftLogId<NID> {}

/// Log entry payload variants.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -185,7 +182,7 @@ impl<C: RaftTypeConfig> RaftPayload<C::NodeId> for Entry<C> {
}
}

impl<C: RaftTypeConfig> RaftEntry<C::NodeId> for Entry<C> {
impl<C: RaftTypeConfig> RaftLogId<C::NodeId> for Entry<C> {
fn get_log_id(&self) -> &LogId<C::NodeId> {
&self.log_id
}
Expand All @@ -195,6 +192,8 @@ impl<C: RaftTypeConfig> RaftEntry<C::NodeId> for Entry<C> {
}
}

impl<C: RaftTypeConfig> RaftEntry<C::NodeId> for Entry<C> {}

// impl traits for RefEntry

impl<'p, C: RaftTypeConfig> RaftPayload<C::NodeId> for EntryRef<'p, C> {
Expand All @@ -207,7 +206,7 @@ impl<'p, C: RaftTypeConfig> RaftPayload<C::NodeId> for EntryRef<'p, C> {
}
}

impl<'p, C: RaftTypeConfig> RaftEntry<C::NodeId> for EntryRef<'p, C> {
impl<'p, C: RaftTypeConfig> RaftLogId<C::NodeId> for EntryRef<'p, C> {
fn get_log_id(&self) -> &LogId<C::NodeId> {
&self.log_id
}
Expand All @@ -216,3 +215,5 @@ impl<'p, C: RaftTypeConfig> RaftEntry<C::NodeId> for EntryRef<'p, C> {
self.log_id = *log_id;
}
}

impl<'p, C: RaftTypeConfig> RaftEntry<C::NodeId> for EntryRef<'p, C> {}
Loading

0 comments on commit 675a0f8

Please sign in to comment.