Skip to content

Commit

Permalink
fix append recovery bug (#251)
Browse files Browse the repository at this point in the history
Close #250

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Jul 26, 2022
1 parent ee024c7 commit 70d27ee
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 24 deletions.
106 changes: 82 additions & 24 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,28 +408,25 @@ impl<A: AllocatorTrait> MemTable<A> {
}
}

/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
/// Appends some entries from append queue. Assumes this table has no
/// rewrite data.
///
/// This method is only used for recovery.
pub fn append_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
debug_assert_eq!(self.rewrite_count, 0);
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
false, /* allow_hole */
// Refer to case in `merge_newer_neighbor`.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
self.global_stats.add(LogQueue::Append, len);
for ei in &entry_indexes {
debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}

Expand Down Expand Up @@ -507,6 +504,31 @@ impl<A: AllocatorTrait> MemTable<A> {
self.rewrite_count = pos + rewrite_len;
}

/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
///
/// This method is only used for recovery.
pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
for ei in &entry_indexes {
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}

/// Removes all entries with index smaller than `index`. Returns the number
/// of deleted entries.
pub fn compact_to(&mut self, index: u64) -> u64 {
Expand Down Expand Up @@ -1059,6 +1081,38 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
}
}

/// Applies changes from log items that are replayed from a append queue.
/// Assumes it haven't applied any rewrite data.
///
/// This method is only used for recovery.
pub fn replay_append_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().replay_append(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, true /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}

/// Applies changes from log items that have been written to rewrite queue.
pub fn apply_rewrite_writes(
&self,
Expand Down Expand Up @@ -1090,15 +1144,16 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
/// Assumes it haven't applied any append data.
///
/// This method is only used for recovery.
pub fn apply_replayed_rewrite_writes(&self, log_items: LogItemDrain) {
pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().append_rewrite(entries_to_add.0);
memtable.write().replay_rewrite(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
// Only append tombstone needs to be recorded.
self.remove(raft, false /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
Expand Down Expand Up @@ -1184,21 +1239,17 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
}
}
match file_id.queue {
LogQueue::Append => self.memtables.apply_append_writes(item_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(item_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()),
}
Ok(())
}

fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
self.log_batch.merge(&mut rhs.log_batch.clone());
match queue {
LogQueue::Append => self.memtables.apply_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(rhs.log_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()),
}
self.memtables.merge_newer_neighbor(rhs.memtables);
Ok(())
Expand Down Expand Up @@ -2045,7 +2096,7 @@ mod tests {
memtable.compact_to(7);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2087,7 +2138,7 @@ mod tests {
memtable.compact_to(10);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2138,7 +2189,7 @@ mod tests {
memtable.merge_newer_neighbor(&mut m1);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2220,6 +2271,13 @@ mod tests {
batches[1].add_command(last_rid, Command::Compact { index: 5 });
batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));

// entries [1, 10] => entries [11, 20][5, 10] => compact 8
last_rid += 1;
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
batches[2].add_command(last_rid, Command::Compact { index: 8 });

for b in batches.iter_mut() {
b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
}
Expand Down
4 changes: 4 additions & 0 deletions tests/failpoints/test_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ fn test_concurrent_write_perf_context() {
}
}

// FIXME: this test no longer works because recovery cannot reliably detect
// overwrite anomaly.
// See https://github.com/tikv/raft-engine/issues/250
#[test]
#[should_panic]
fn test_recycle_with_stale_logbatch_at_tail() {
let dir = tempfile::Builder::new()
.prefix("test_recycle_with_stale_log_batch_at_tail")
Expand Down

0 comments on commit 70d27ee

Please sign in to comment.