Skip to content

Commit

Permalink
Fix panic when repairing certain databases
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Jun 29, 2023
1 parent 71b66c9 commit 37ed10a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
19 changes: 14 additions & 5 deletions src/tree_store/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<'a> UntypedBtreeMut<'a> {
let mut page = self.mem.get_page_mut(page_number)?;

match page.memory()[0] {
LEAF => Ok(leaf_checksum(&page, self.key_width, self.value_width)),
LEAF => leaf_checksum(&page, self.key_width, self.value_width),
BRANCH => {
let accessor = BranchAccessor::new(&page, self.key_width);
let mut new_children = vec![];
Expand All @@ -100,7 +100,7 @@ impl<'a> UntypedBtreeMut<'a> {
}
drop(mutator);

Ok(branch_checksum(&page, self.key_width))
branch_checksum(&page, self.key_width)
}
_ => unreachable!(),
}
Expand Down Expand Up @@ -506,11 +506,20 @@ impl<'a> RawBtree<'a> {
let node_mem = page.memory();
Ok(match node_mem[0] {
LEAF => {
expected_checksum
== leaf_checksum(&page, self.fixed_key_size, self.fixed_value_size)
if let Ok(computed) =
leaf_checksum(&page, self.fixed_key_size, self.fixed_value_size)
{
expected_checksum == computed
} else {
false
}
}
BRANCH => {
if expected_checksum != branch_checksum(&page, self.fixed_key_size) {
if let Ok(computed) = branch_checksum(&page, self.fixed_key_size) {
if expected_checksum != computed {
return Ok(false);
}
} else {
return Ok(false);
}
let accessor = BranchAccessor::new(&page, self.fixed_key_size);
Expand Down
35 changes: 26 additions & 9 deletions src/tree_store/btree_base.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::tree_store::page_store::{xxh3_checksum, Page, PageImpl, PageMut, TransactionalMemory};
use crate::tree_store::PageNumber;
use crate::types::{RedbKey, RedbValue, RedbValueMutInPlace};
use crate::Result;
use crate::{Result, StorageError};
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::mem::size_of;
Expand All @@ -19,20 +19,37 @@ pub(super) fn leaf_checksum<T: Page>(
page: &T,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
) -> Checksum {
) -> Result<Checksum, StorageError> {
let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
// TODO: during verification, the page could be corrupted, so this needs to be safe on
// arbitrary data
let end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
xxh3_checksum(&page.memory()[..end])
if end > page.memory().len() {
Err(StorageError::Corrupted(format!(
"Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
page.get_page_number(),
end,
page.memory().len()
)))
} else {
Ok(xxh3_checksum(&page.memory()[..end]))
}
}

pub(super) fn branch_checksum<T: Page>(page: &T, fixed_key_size: Option<usize>) -> Checksum {
pub(super) fn branch_checksum<T: Page>(
page: &T,
fixed_key_size: Option<usize>,
) -> Result<Checksum, StorageError> {
let accessor = BranchAccessor::new(page, fixed_key_size);
// TODO: during verification, the page could be corrupted, so this needs to be safe on
// arbitrary data
let end = accessor.key_end(accessor.num_keys() - 1);
xxh3_checksum(&page.memory()[..end])
if end > page.memory().len() {
Err(StorageError::Corrupted(format!(
"Branch page {:?} corrupted. Last offset {} beyond end of data {}",
page.get_page_number(),
end,
page.memory().len()
)))
} else {
Ok(xxh3_checksum(&page.memory()[..end]))
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down

0 comments on commit 37ed10a

Please sign in to comment.