Skip to content

Commit

Permalink
Optimize restore_savepoint()
Browse files Browse the repository at this point in the history
This restores and fixes the optimization that was removed in baa86e7

restore_savepoint() now scales with the number of database modifications
since the savepoint was captured, rather than the size of the database

savepoint_benchmark is now ~35x faster for an 8GiB database file
  • Loading branch information
cberner committed Dec 27, 2024
1 parent b27a9df commit 51ca54c
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 93 deletions.
171 changes: 123 additions & 48 deletions src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crate::sealed::Sealed;
use crate::table::ReadOnlyUntypedTable;
use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
use crate::tree_store::{
Btree, BtreeHeader, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, Page,
PageHint, PageNumber, SerializedSavepoint, TableTree, TableTreeMut, TableType,
TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
InternalTableDefinition, Page, PageHint, PageNumber, SerializedSavepoint, TableTree,
TableTreeMut, TableType, TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
};
use crate::types::{Key, Value};
use crate::{
Expand All @@ -25,7 +25,6 @@ use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::mem::size_of;
use std::ops::RangeBounds;
#[cfg(any(test, fuzzing))]
use std::ops::RangeFull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -563,34 +562,32 @@ impl WriteTransaction {
println!("Tracker page");
println!("{tracker:?}");

let table_allocators = self
.tables
let mut table_pages = vec![];
self.tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()
.visit_all_pages(|path| {
table_pages.push(path.page_number());
Ok(())
})
.unwrap();
let mut table_pages = vec![];
for (i, allocator) in table_allocators.iter().enumerate() {
allocator.get_allocated_pages(i.try_into().unwrap(), &mut table_pages);
}
println!("Tables");
for p in table_pages {
all_allocated.remove(&p);
println!("{p:?}");
}

let system_table_allocators = self
.system_tables
let mut system_table_pages = vec![];
self.system_tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()
.visit_all_pages(|path| {
system_table_pages.push(path.page_number());
Ok(())
})
.unwrap();
let mut system_table_pages = vec![];
for (i, allocator) in system_table_allocators.iter().enumerate() {
allocator.get_allocated_pages(i.try_into().unwrap(), &mut system_table_pages);
}
println!("System tables");
for p in system_table_pages {
all_allocated.remove(&p);
Expand Down Expand Up @@ -837,31 +834,51 @@ impl WriteTransaction {
// and new roots
// 3) update the system tree to remove invalid persistent savepoints.

let old_table_tree = TableTreeMut::new(
savepoint.get_user_root(),
let old_system_tree = TableTree::new(
savepoint.get_system_root(),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);
// TODO: traversing these can be very slow in a large database. Speed this up.
let current_root_pages = self
.tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()?;
let old_root_pages = old_table_tree.all_referenced_pages()?;

// 1) restore the table tree
self.tables.lock().unwrap().table_tree = TableTreeMut::new(
savepoint.get_user_root(),
)?;
let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
savepoint.get_freed_root(),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);
)?;

// 1a) filter any pages referenced by the old data root to bring them back to the committed state
let mut txn_id = savepoint.get_transaction_id().raw_id();
// Pages which are part of the system and freed trees in the savepoint, should be freed
// even after the savepoint is restored, because the system and freed trees only roll
// forward
let mut old_system_and_freed_pages = HashSet::new();
old_system_tree.visit_all_pages(|path| {
old_system_and_freed_pages.insert(path.page_number());
Ok(())
})?;
old_freed_tree.visit_all_pages(|path| {
old_system_and_freed_pages.insert(path.page_number());
Ok(())
})?;

// 1) restore the table tree
{
self.tables.lock().unwrap().table_tree = TableTreeMut::new(
savepoint.get_user_root(),
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);
}

// 1a) purge all transactions that happened after the savepoint from freed tree,
// except pages from the old system or freed tree in the savepoint. Those still need to be
// freed, since the system tree only rolls forward, never back. This brings all pages in the
// old data root back to the committed state
// This operation will also leak everything else that was allocated since the savepoint,
// but we fix that below -- noting that all the system trees that existed between the savepoint
// and now which might be referenced by other savepoints will become unreachable, since those
// savepoints are invalidated by this restoration
let mut txn_id = savepoint.get_transaction_id().next().raw_id();
let mut freed_tree = self.freed_tree.lock().unwrap();
loop {
let lower = FreedTableKey {
Expand All @@ -887,7 +904,8 @@ impl WriteTransaction {
let item = entry?;
for i in 0..item.value().len() {
let p = item.value().get(i);
if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) {
// Keep the old system and freed tree pages, but purge anything else
if old_system_and_freed_pages.contains(&p) {
pending_pages.push(p);
}
}
Expand Down Expand Up @@ -917,19 +935,76 @@ impl WriteTransaction {
txn_id += 1;
}

// 2) free all pages that became unreachable
let mut freed_pages = self.freed_pages.lock().unwrap();
for i in 0..current_root_pages.len() {
let mut pages = vec![];
current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages);
for page in pages {
if self.mem.uncommitted(page) {
self.mem.free(page);
let mut current_system_and_freed_pages = HashSet::new();
self.system_tables
.lock()
.unwrap()
.table_tree
.visit_all_pages(|path| {
current_system_and_freed_pages.insert(path.page_number());
Ok(())
})?;
freed_tree.visit_all_pages(|path| {
current_system_and_freed_pages.insert(path.page_number());
Ok(())
})?;

let mut old_allocators: Vec<BuddyAllocator> = savepoint
.get_regional_allocators()
.iter()
.map(|data| BuddyAllocator::from_savepoint_state(data))
.collect();

// Find the oldest transaction in the current freed tree, for use below
{
let oldest_unprocessed_transaction =
if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
entry?.key().transaction_id
} else {
freed_pages.push(page);
self.transaction_id.raw_id()
};

let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_transaction,
pagination_id: 0,
};

// Replay all finalized frees into the old allocator state to ensure that a page which
// was pending free, freed, and then reallocated does not leak
for entry in old_freed_tree.range(&(..lookup_key))? {
let item = entry?;
let pages: FreedPageList = item.value();
for i in 0..pages.len() {
let page = pages.get(i);
assert!(old_allocators[page.region as usize]
.is_allocated(page.page_index, page.page_order));
old_allocators[page.region as usize].free(page.page_index, page.page_order);
}
}
}

// 2) free all pages that became unreachable
let mut freed_pages = self.freed_pages.lock().unwrap();
let mut already_awaiting_free: HashSet<PageNumber> = freed_pages.iter().copied().collect();
already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
for page in to_free {
if already_awaiting_free.contains(&page) {
// Make sure that we don't double free something that is already going to be freed
continue;
}
if current_system_and_freed_pages.contains(&page) {
// Don't free pages which are part of the current system or freed tree, even though
// these pages are new. Again this is because these trees only move forward;
// never backwards as part of a savepoint restore
continue;
}
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
}
}
drop(freed_pages);

// 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
Expand Down
21 changes: 14 additions & 7 deletions src/tree_store/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,7 @@ impl<K: Key + 'static, V: Value + 'static> BtreeMut<'_, K, V> {
where
F: FnMut(&PagePath) -> Result,
{
let tree = UntypedBtree::new(
self.root,
self.mem.clone(),
K::fixed_width(),
V::fixed_width(),
);
tree.visit_all_pages(visitor)
self.read_tree()?.visit_all_pages(visitor)
}

pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
Expand Down Expand Up @@ -721,6 +715,19 @@ impl<K: Key, V: Value> Btree<K, V> {
self.root
}

pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
where
F: FnMut(&PagePath) -> Result,
{
let tree = UntypedBtree::new(
self.root,
self.mem.clone(),
K::fixed_width(),
V::fixed_width(),
);
tree.visit_all_pages(visitor)
}

pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'static, V>>> {
if let Some(ref root_page) = self.cached_root {
self.get_helper(root_page.clone(), K::as_bytes(key).as_ref())
Expand Down
4 changes: 2 additions & 2 deletions src/tree_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub(crate) use btree_base::{
pub(crate) use btree_iters::{AllPageNumbersBtreeIter, BtreeExtractIf, BtreeRangeIter};
pub use page_store::{file_backend, InMemoryBackend, Savepoint};
pub(crate) use page_store::{
Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory, FILE_FORMAT_VERSION2,
MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, PAGE_SIZE,
BuddyAllocator, Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory,
FILE_FORMAT_VERSION2, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, PAGE_SIZE,
};
pub(crate) use table_tree::{FreedPageList, FreedTableKey, TableTree, TableTreeMut};
pub(crate) use table_tree_base::{InternalTableDefinition, TableType};
1 change: 0 additions & 1 deletion src/tree_store/page_store/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ impl U64GroupedBitmap {
U64GroupedBitmapDifference::new(&self.data, &exclusion.data)
}

#[allow(dead_code)]
pub fn iter(&self) -> U64GroupedBitmapIter {
U64GroupedBitmapIter::new(self.len, &self.data)
}
Expand Down
56 changes: 55 additions & 1 deletion src/tree_store/page_store/buddy_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,43 @@ impl BuddyAllocator {
free_pages
}

// Inverse of make_state_for_savepoint()
pub(crate) fn from_savepoint_state(data: &[u8]) -> Self {
let mut offset = 0;
let max_order = data[offset];
offset += 1;
let len = u32::from_le_bytes(
data[offset..(offset + size_of::<u32>())]
.try_into()
.unwrap(),
);
offset += size_of::<u32>();

let mut data_start = offset + size_of::<u32>() * (max_order as usize + 1);
let mut allocated_sets = vec![];
for _ in 0..=max_order {
let data_end = u32::from_le_bytes(
data[offset..(offset + size_of::<u32>())]
.try_into()
.unwrap(),
) as usize;
offset += size_of::<u32>();
allocated_sets.push(U64GroupedBitmap::from_bytes(&data[data_start..data_end]));
data_start = data_end;
}
assert_eq!(data_start, data.len());

let mut result = Self::new(len, allocated_sets[0].capacity());

for (order, allocated) in allocated_sets.iter().enumerate() {
for page_number in allocated.iter() {
result.record_alloc(page_number, order.try_into().unwrap());
}
}

result
}

// Reduced state for savepoint, which includes only the list of allocated pages
// Format:
// 1 byte: max order
Expand Down Expand Up @@ -246,7 +283,6 @@ impl BuddyAllocator {
}
}

#[cfg(any(test, fuzzing))]
pub(crate) fn get_allocated_pages(&self, region: u32, output: &mut Vec<PageNumber>) {
for order in 0..=self.max_order {
let allocated = self.get_order_allocated(order);
Expand Down Expand Up @@ -612,6 +648,24 @@ mod test {
assert_eq!(allocator.count_allocated_pages(), 0);
}

#[test]
fn make_savepoint_state() {
let num_pages = 256;
let mut allocator = BuddyAllocator::new(num_pages, num_pages);

// Allocate some arbitrary stuff
allocator.record_alloc(7, 0);
allocator.alloc(0);
allocator.alloc(1);
allocator.alloc(3);
allocator.alloc(0);
allocator.alloc(0);

let allocator2 =
BuddyAllocator::from_savepoint_state(&allocator.make_state_for_savepoint());
assert_eq!(allocator.to_vec(), allocator2.to_vec());
}

#[test]
fn serialized_size() {
// Check that serialized size is as expected for a full region
Expand Down
3 changes: 1 addition & 2 deletions src/tree_store/page_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ pub use savepoint::Savepoint;
pub(crate) use savepoint::SerializedSavepoint;

pub(super) use base::{PageImpl, PageMut};
pub(super) use buddy_allocator::BuddyAllocator;
pub(super) use region::new_allocators;
pub(crate) use buddy_allocator::BuddyAllocator;
pub(super) use xxh3::hash128_with_seed;
Loading

0 comments on commit 51ca54c

Please sign in to comment.