diff --git a/src/db.rs b/src/db.rs index f5417f53..fedb2984 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use crate::error::TransactionError; -use crate::multimap_table::parse_subtree_roots; +use crate::multimap_table::{parse_subtree_roots, DynamicCollection}; use crate::sealed::Sealed; use crate::transactions::SAVEPOINT_TABLE; #[cfg(feature = "logging")] @@ -263,76 +263,24 @@ impl Database { self.mem.set_crash_countdown(value); } - fn verify_primary_checksums(mem: &TransactionalMemory) -> Result { - if let Some((root, root_checksum)) = mem.get_data_root() { - if !RawBtree::new( - Some((root, root_checksum)), - <&str>::fixed_width(), - InternalTableDefinition::fixed_width(), - mem, - ) - .verify_checksum()? - { - return Ok(false); - } - - // Iterate over all other tables - let iter: BtreeRangeIter<&str, InternalTableDefinition> = - BtreeRangeIter::new::(&(..), Some(root), mem)?; - for entry in iter { - let definition = entry?.value(); - if let Some((table_root, table_checksum)) = definition.get_root() { - if !RawBtree::new( - Some((table_root, table_checksum)), - definition.get_fixed_key_size(), - definition.get_fixed_value_size(), - mem, - ) - .verify_checksum()? - { - return Ok(false); - } - } - } + pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result { + let fake_freed_pages = Arc::new(Mutex::new(vec![])); + let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone()); + if !table_tree.verify_checksums()? { + return Ok(false); } - - if let Some((root, root_checksum)) = mem.get_system_root() { - if !RawBtree::new( - Some((root, root_checksum)), - <&str>::fixed_width(), - InternalTableDefinition::fixed_width(), - mem, - ) - .verify_checksum()? - { - return Ok(false); - } - - // Iterate over all other tables - let iter: BtreeRangeIter<&str, InternalTableDefinition> = - BtreeRangeIter::new::(&(..), Some(root), mem)?; - for entry in iter { - let definition = entry?.value(); - if let Some((table_root, table_checksum)) = definition.get_root() { - if !RawBtree::new( - Some((table_root, table_checksum)), - definition.get_fixed_key_size(), - definition.get_fixed_value_size(), - mem, - ) - .verify_checksum()? - { - return Ok(false); - } - } - } + let system_table_tree = + TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone()); + if !system_table_tree.verify_checksums()? { + return Ok(false); } + assert!(fake_freed_pages.lock().unwrap().is_empty()); if let Some((freed_root, freed_checksum)) = mem.get_freed_root() { if !RawBtree::new( Some((freed_root, freed_checksum)), FreedTableKey::fixed_width(), - None, + FreedPageList::fixed_width(), mem, ) .verify_checksum()? @@ -499,34 +447,44 @@ impl Database { for entry in iter { let definition = entry?.value(); if let Some((table_root, _)) = definition.get_root() { - let table_pages_iter = AllPageNumbersBtreeIter::new( - table_root, - definition.get_fixed_key_size(), - definition.get_fixed_value_size(), - mem, - )?; - mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?; - - // Multimap tables may have additional subtrees in their values - if definition.get_type() == TableType::Multimap { - let table_pages_iter = AllPageNumbersBtreeIter::new( - table_root, - definition.get_fixed_key_size(), - definition.get_fixed_value_size(), - mem, - )?; - for table_page in table_pages_iter { - let page = mem.get_page(table_page?)?; - let subtree_roots = - parse_subtree_roots(&page, definition.get_fixed_key_size()); - for sub_root in subtree_roots { - let sub_root_iter = AllPageNumbersBtreeIter::new( - sub_root, - definition.get_fixed_value_size(), - <()>::fixed_width(), - mem, - )?; - mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?; + match definition.get_type() { + TableType::Normal => { + let table_pages_iter = AllPageNumbersBtreeIter::new( + table_root, + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + mem, + )?; + mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?; + } + TableType::Multimap => { + let table_pages_iter = AllPageNumbersBtreeIter::new( + table_root, + definition.get_fixed_key_size(), + <&DynamicCollection>::fixed_width(), + mem, + )?; + mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?; + + let table_pages_iter = AllPageNumbersBtreeIter::new( + table_root, + definition.get_fixed_key_size(), + <&DynamicCollection>::fixed_width(), + mem, + )?; + for table_page in table_pages_iter { + let page = mem.get_page(table_page?)?; + let subtree_roots = + parse_subtree_roots(&page, definition.get_fixed_key_size()); + for (sub_root, _) in subtree_roots { + let sub_root_iter = AllPageNumbersBtreeIter::new( + sub_root, + definition.get_fixed_value_size(), + <()>::fixed_width(), + mem, + )?; + mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?; + } } } } diff --git a/src/multimap_table.rs b/src/multimap_table.rs index 1642c849..ef7cc735 100644 --- a/src/multimap_table.rs +++ b/src/multimap_table.rs @@ -2,8 +2,8 @@ use crate::multimap_table::DynamicCollectionType::{Inline, Subtree}; use crate::sealed::Sealed; use crate::tree_store::{ AllPageNumbersBtreeIter, Btree, BtreeMut, BtreeRangeIter, Checksum, LeafAccessor, LeafMutator, - Page, PageHint, PageNumber, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, BRANCH, LEAF, - MAX_VALUE_LENGTH, + Page, PageHint, PageNumber, RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, + BRANCH, LEAF, MAX_VALUE_LENGTH, }; use crate::types::{RedbKey, RedbValue, TypeName}; use crate::{AccessGuard, Result, StorageError, WriteTransaction}; @@ -15,6 +15,48 @@ use std::mem::size_of; use std::ops::{RangeBounds, RangeFull}; use std::sync::{Arc, Mutex}; +// Verify all the checksums in the tree, including any Dynamic collection subtrees +pub(crate) fn verify_tree_and_subtree_checksums( + root: Option<(PageNumber, Checksum)>, + key_size: Option, + value_size: Option, + mem: &TransactionalMemory, +) -> Result { + if let Some((root, root_checksum)) = root { + if !RawBtree::new( + Some((root, root_checksum)), + key_size, + <&DynamicCollection>::fixed_width(), + mem, + ) + .verify_checksum()? + { + return Ok(false); + } + + let table_pages_iter = + AllPageNumbersBtreeIter::new(root, key_size, <&DynamicCollection>::fixed_width(), mem)?; + for table_page in table_pages_iter { + let page = mem.get_page(table_page?)?; + let subtree_roots = parse_subtree_roots(&page, key_size); + for (sub_root, sub_root_checksum) in subtree_roots { + if !RawBtree::new( + Some((sub_root, sub_root_checksum)), + value_size, + <()>::fixed_width(), + mem, + ) + .verify_checksum()? + { + return Ok(false); + } + } + } + } + + Ok(true) +} + // Finalize all the checksums in the tree, including any Dynamic collection subtrees // Returns the root checksum pub(crate) fn finalize_tree_and_subtree_checksums( @@ -80,7 +122,7 @@ pub(crate) fn finalize_tree_and_subtree_checksums( pub(crate) fn parse_subtree_roots( page: &T, fixed_key_size: Option, -) -> Vec { +) -> Vec<(PageNumber, Checksum)> { match page.memory()[0] { BRANCH => { vec![] @@ -96,7 +138,7 @@ pub(crate) fn parse_subtree_roots( let entry = accessor.entry(i).unwrap(); let collection = <&DynamicCollection>::from_bytes(entry.value()); if matches!(collection.collection_type(), DynamicCollectionType::Subtree) { - result.push(collection.as_subtree().0); + result.push(collection.as_subtree()); } } @@ -201,7 +243,7 @@ impl Into for DynamicCollectionType { /// (when type = 2) checksum (16 bytes): sub tree checksum #[derive(Debug)] #[repr(transparent)] -struct DynamicCollection { +pub(crate) struct DynamicCollection { data: [u8], // TODO: include V type when GATs are stable // _value_type: PhantomData, diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index 62be2fb8..1cb96aba 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -234,6 +234,16 @@ impl<'a, K: RedbKey + 'a, V: RedbValue + 'a> BtreeMut<'a, K, V> { } } + pub(crate) fn verify_checksum(&self) -> Result { + RawBtree::new( + self.get_root(), + K::fixed_width(), + V::fixed_width(), + self.mem, + ) + .verify_checksum() + } + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result { let mut tree = UntypedBtreeMut::new( self.get_root(), diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index 29fbab2e..0216c945 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -1,9 +1,11 @@ use crate::error::TableError; -use crate::multimap_table::finalize_tree_and_subtree_checksums; +use crate::multimap_table::{ + finalize_tree_and_subtree_checksums, verify_tree_and_subtree_checksums, +}; use crate::tree_store::btree::{btree_stats, UntypedBtreeMut}; use crate::tree_store::btree_base::Checksum; use crate::tree_store::btree_iters::AllPageNumbersBtreeIter; -use crate::tree_store::{BtreeMut, BtreeRangeIter, PageNumber, TransactionalMemory}; +use crate::tree_store::{BtreeMut, BtreeRangeIter, PageNumber, RawBtree, TransactionalMemory}; use crate::types::{RedbKey, RedbValue, RedbValueMutInPlace, TypeName}; use crate::{DatabaseStats, Result}; use std::cmp::max; @@ -476,6 +478,48 @@ impl<'txn> TableTree<'txn> { self.pending_table_updates.clear(); } + pub(crate) fn verify_checksums(&self) -> Result { + assert!(self.pending_table_updates.is_empty()); + if !self.tree.verify_checksum()? { + return Ok(false); + } + + for entry in self.tree.range::(&(..))? { + let entry = entry?; + let definition = entry.value(); + // TODO: all these matches on table_type() are pretty errorprone, because you can call .get_fixed_value_size() on either. + // Maybe InternalTableDefinition should be an enum for normal and multimap, instead + match definition.get_type() { + TableType::Normal => { + if let Some((table_root, table_checksum)) = definition.get_root() { + if !RawBtree::new( + Some((table_root, table_checksum)), + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + self.mem, + ) + .verify_checksum()? + { + return Ok(false); + } + } + } + TableType::Multimap => { + if !verify_tree_and_subtree_checksums( + definition.get_root(), + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + self.mem, + )? { + return Ok(false); + } + } + } + } + + Ok(true) + } + pub(crate) fn flush_table_root_updates(&mut self) -> Result> { for (name, table_root) in self.pending_table_updates.drain() { // Bypass .get_table() since the table types are dynamic