Skip to content

Commit

Permalink
change: remove PurgedMarker. keep logs clean
Browse files Browse the repository at this point in the history
Changing log(add a PurgedMarker(original SnapshotPointer)) makes it
diffeicult to impl `install-snapshot` for a RaftStore without a lock
protecting both logs and state machine.

Adding a PurgedMarker and installing the snapshot has to be atomic in
storage layer. But usually logs and state machine are separated store.
e.g., logs are stored in fast flash disk and state machine is stored
some where else.

To get rid of the big lock, PurgedMarker is removed and installing a
snaphost does not need to keep consistent with logs any more.
  • Loading branch information
drmingdrmer committed Sep 9, 2021
1 parent 9915089 commit deda6d7
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 24 deletions.
1 change: 0 additions & 1 deletion async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
EntryPayload::Blank => {}
EntryPayload::Normal(_) => {}
EntryPayload::PurgedMarker => {}
}
}

Expand Down
13 changes: 0 additions & 13 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,16 +510,6 @@ pub struct Entry<D: AppData> {
pub payload: EntryPayload<D>,
}

impl<D: AppData> Entry<D> {
/// Create a new snapshot pointer from the given snapshot meta.
pub fn new_purged_marker(log_id: LogId) -> Self {
Entry {
log_id,
payload: EntryPayload::PurgedMarker,
}
}
}

impl<D: AppData> MessageSummary for Entry<D> {
fn summary(&self) -> String {
format!("{}:{}", self.log_id, self.payload.summary())
Expand Down Expand Up @@ -557,8 +547,6 @@ pub enum EntryPayload<D: AppData> {
Normal(EntryNormal<D>),
/// A config change log entry.
ConfigChange(EntryConfigChange),
/// An entry before which all logs are removed.
PurgedMarker,
}

impl<D: AppData> MessageSummary for EntryPayload<D> {
Expand All @@ -569,7 +557,6 @@ impl<D: AppData> MessageSummary for EntryPayload<D> {
EntryPayload::ConfigChange(c) => {
format!("config-change: {:?}", c.membership)
}
EntryPayload::PurgedMarker => "purged-marker".to_string(),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// This condition would only ever be reached if the log has been removed due to
// log compaction (barring critical storage failure), so transition to snapshotting.
self.set_target_state(TargetReplState::Snapshotting);
return;
}
};
}
Expand Down
6 changes: 3 additions & 3 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,11 @@ impl RaftRouter {
) {
let rt = self.routing_table.read().await;
for (id, (_node, storage)) in rt.iter() {
let last_log = storage.get_log_entries(..).await.unwrap().last().unwrap().log_id.index;
let last_log_id = storage.last_log_id().await;
assert_eq!(
last_log, expect_last_log,
expect_last_log, last_log_id.index,
"expected node {} to have last_log {}, got {}",
id, expect_last_log, last_log
id, expect_last_log, last_log_id.index
);

let hs = storage.read_hard_state().await.unwrap_or_else(|| panic!("no hard state found for node {}", id));
Expand Down
6 changes: 0 additions & 6 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,6 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

match entry.payload {
EntryPayload::Blank => res.push(ClientResponse(None)),
EntryPayload::PurgedMarker => {
return Err(anyhow::anyhow!("PurgedMarker should never be passed to state machine"));
}
EntryPayload::Normal(ref norm) => {
let data = &norm.data;
if let Some((serial, r)) = sm.client_serial_responses.get(&data.client) {
Expand Down Expand Up @@ -773,9 +770,6 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
// Remove logs that are included in the snapshot.
// Leave at least one log or the replication can not find out the mismatched log.
*log = log.split_off(&meta.last_log_id.index);

// In case there are no log at all, a marker log need to be added to indicate the last log.
log.insert(meta.last_log_id.index, Entry::new_purged_marker(meta.last_log_id));
}

// Update the state machine.
Expand Down

0 comments on commit deda6d7

Please sign in to comment.