Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

ancient append vec append creates big enough ancient append vecs #34154

Merged
merged 16 commits into from
Dec 5, 2023
212 changes: 189 additions & 23 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use {
AccountStorage, AccountStorageStatus, ShrinkInProgress,
},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_file::{AccountsFile, AccountsFileError, MatchAccountOwnerError},
accounts_file::{
AccountsFile, AccountsFileError, MatchAccountOwnerError, ALIGN_BOUNDARY_OFFSET,
},
accounts_hash::{
AccountHash, AccountsDeltaHash, AccountsHash, AccountsHashKind, AccountsHasher,
CalcAccountsHashConfig, CalculateHashIntermediate, HashStats, IncrementalAccountsHash,
Expand Down Expand Up @@ -66,6 +68,7 @@ use {
rent_collector::RentCollector,
sorted_storages::SortedStorages,
storable_accounts::StorableAccounts,
u64_align,
verify_accounts_hash_in_background::VerifyAccountsHashInBackground,
},
blake3::traits::digest::Digest,
Expand Down Expand Up @@ -334,13 +337,17 @@ impl CurrentAncientAppendVec {
}
}

/// Create ancient append vec for a slot
/// min_bytes: the new append vec needs to have at least this capacity
#[must_use]
fn create_ancient_append_vec<'a>(
&mut self,
slot: Slot,
db: &'a AccountsDb,
min_bytes: usize,
) -> ShrinkInProgress<'a> {
let shrink_in_progress = db.get_store_for_shrink(slot, get_ancient_append_vec_capacity());
let size = get_ancient_append_vec_capacity().max(min_bytes as u64);
jeffwashington marked this conversation as resolved.
Show resolved Hide resolved
let shrink_in_progress = db.get_store_for_shrink(slot, size);
*self = Self::new(slot, Arc::clone(shrink_in_progress.new_storage()));
shrink_in_progress
}
Expand All @@ -349,9 +356,10 @@ impl CurrentAncientAppendVec {
&mut self,
slot: Slot,
db: &'a AccountsDb,
min_bytes: usize,
) -> Option<ShrinkInProgress<'a>> {
if self.slot_and_append_vec.is_none() {
Some(self.create_ancient_append_vec(slot, db))
Some(self.create_ancient_append_vec(slot, db, min_bytes))
} else {
None
}
Expand All @@ -368,21 +376,31 @@ impl CurrentAncientAppendVec {
}

/// helper function to cleanup call to 'store_accounts_frozen'
/// return timing and bytes written
fn store_ancient_accounts(
&self,
db: &AccountsDb,
accounts_to_store: &AccountsToStore,
storage_selector: StorageSelector,
) -> StoreAccountsTiming {
) -> (StoreAccountsTiming, u64) {
let accounts = accounts_to_store.get(storage_selector);

db.store_accounts_frozen(
let previous_available = self.append_vec().accounts.remaining_bytes();
let timing = db.store_accounts_frozen(
(self.slot(), accounts, accounts_to_store.slot()),
None::<Vec<AccountHash>>,
self.append_vec(),
None,
StoreReclaims::Ignore,
)
);
let bytes_written =
previous_available.saturating_sub(self.append_vec().accounts.remaining_bytes());
assert_eq!(
bytes_written,
u64_align!(accounts_to_store.get_bytes(storage_selector)) as u64
);

(timing, bytes_written)
}
}

Expand Down Expand Up @@ -4587,8 +4605,10 @@ impl AccountsDb {
}

let mut stats_sub = ShrinkStatsSub::default();
let (mut shrink_in_progress, create_and_insert_store_elapsed_us) =
measure_us!(current_ancient.create_if_necessary(slot, self));
let mut bytes_remaining_to_write = shrink_collect.alive_total_bytes;
let (mut shrink_in_progress, create_and_insert_store_elapsed_us) = measure_us!(
current_ancient.create_if_necessary(slot, self, shrink_collect.alive_total_bytes)
);
stats_sub.create_and_insert_store_elapsed_us = create_and_insert_store_elapsed_us;
let available_bytes = current_ancient.append_vec().accounts.remaining_bytes();
// split accounts in 'slot' into:
Expand All @@ -4610,17 +4630,25 @@ impl AccountsDb {

let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
// write what we can to the current ancient storage
stats_sub.store_accounts_timing =
let (store_accounts_timing, bytes_written) =
current_ancient.store_ancient_accounts(self, &to_store, StorageSelector::Primary);
stats_sub.store_accounts_timing = store_accounts_timing;
bytes_remaining_to_write = bytes_remaining_to_write.saturating_sub(bytes_written as usize);

// handle accounts from 'slot' which did not fit into the current ancient append vec
if to_store.has_overflow() {
// We need a new ancient append vec at this slot.
// Assert: it cannot be the case that we already had an ancient append vec at this slot and
// yet that ancient append vec does not have room for the accounts stored at this slot currently
assert_ne!(slot, current_ancient.slot());
let (shrink_in_progress_overflow, time_us) =
measure_us!(current_ancient.create_ancient_append_vec(slot, self));

// Now we create an ancient append vec at `slot` to store the overflows.
let (shrink_in_progress_overflow, time_us) = measure_us!(current_ancient
.create_ancient_append_vec(
slot,
self,
to_store.get_bytes(StorageSelector::Overflow)
));
stats_sub.create_and_insert_store_elapsed_us += time_us;
// We cannot possibly be shrinking the original slot that created an ancient append vec
// AND not have enough room in the ancient append vec at that slot
Expand All @@ -4632,10 +4660,16 @@ impl AccountsDb {
shrink_in_progress = Some(shrink_in_progress_overflow);

// write the overflow accounts to the next ancient storage
let timing =
let (store_accounts_timing, bytes_written) =
current_ancient.store_ancient_accounts(self, &to_store, StorageSelector::Overflow);
stats_sub.store_accounts_timing.accumulate(&timing);
bytes_remaining_to_write =
bytes_remaining_to_write.saturating_sub(bytes_written as usize);

stats_sub
.store_accounts_timing
.accumulate(&store_accounts_timing);
}
assert_eq!(bytes_remaining_to_write, 0);
rewrite_elapsed.stop();
stats_sub.rewrite_elapsed_us = rewrite_elapsed.as_us();

Expand Down Expand Up @@ -9814,6 +9848,7 @@ pub mod tests {
accounts_index::{
tests::*, AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount,
},
ancient_append_vecs,
append_vec::{test_utils::TempFile, AppendVecStoredAccountMeta},
cache_hash_data::CacheHashDataFile,
inline_spl_token,
Expand Down Expand Up @@ -9990,6 +10025,47 @@ pub mod tests {
}
}

#[test]
fn test_create_ancient_append_vec() {
let ancient_append_vec_size = ancient_append_vecs::get_ancient_append_vec_capacity();
let db = AccountsDb::new_single_for_tests();

{
// create an ancient appendvec from a small appendvec, the size of
// the ancient appendvec should be the size of the ideal ancient
// appendvec size.
let mut current_ancient = CurrentAncientAppendVec::default();
let slot0 = 0;

// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot0, 1000, "test");
let _ = current_ancient.create_ancient_append_vec(slot0, &db, 0);
assert_eq!(
current_ancient.append_vec().capacity(),
ancient_append_vec_size
);
}

{
// create an ancient appendvec from a large appendvec (bigger than
// current ancient_append_vec_size), the ancient appendvec should be
// the size of the bigger ancient appendvec size.
let mut current_ancient = CurrentAncientAppendVec::default();
let slot1 = 1;
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot1, 1000, "test");
let _ = current_ancient.create_ancient_append_vec(
slot1,
&db,
2 * ancient_append_vec_size as usize,
);
assert_eq!(
current_ancient.append_vec().capacity(),
2 * ancient_append_vec_size
);
}
}

#[test]
fn test_maybe_unref_accounts_already_in_ancient() {
let db = AccountsDb::new_single_for_tests();
Expand Down Expand Up @@ -10036,7 +10112,7 @@ pub mod tests {
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot0, 1000, "test");
{
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot0, &db);
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot0, &db, 0);
}
let mut ancient_slot_pubkeys = AncientSlotPubkeys::default();
assert!(ancient_slot_pubkeys.inner.is_none());
Expand All @@ -10051,7 +10127,7 @@ pub mod tests {
// different slot than current_ancient, so update 'ancient_slot_pubkeys'
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot1, 1000, "test");
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot1, &db);
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot1, &db, 0);
let slot2 = 2;
ancient_slot_pubkeys.maybe_unref_accounts_already_in_ancient(
slot2,
Expand Down Expand Up @@ -16875,7 +16951,7 @@ pub mod tests {
append_vec.append_vec_id()
);

let _shrink_in_progress = current_ancient.create_if_necessary(slot2, &db);
let _shrink_in_progress = current_ancient.create_if_necessary(slot2, &db, 0);
assert_eq!(current_ancient.slot(), slot);
assert_eq!(current_ancient.append_vec_id(), append_vec.append_vec_id());
}
Expand All @@ -16887,13 +16963,13 @@ pub mod tests {
let _existing_append_vec = db.create_and_insert_store(slot2, 1000, "test");

let mut current_ancient = CurrentAncientAppendVec::default();
let mut _shrink_in_progress = current_ancient.create_if_necessary(slot2, &db);
let mut _shrink_in_progress = current_ancient.create_if_necessary(slot2, &db, 0);
let id = current_ancient.append_vec_id();
assert_eq!(current_ancient.slot(), slot2);
assert!(is_ancient(&current_ancient.append_vec().accounts));
let slot3 = 3;
// should do nothing
let _shrink_in_progress = current_ancient.create_if_necessary(slot3, &db);
let _shrink_in_progress = current_ancient.create_if_necessary(slot3, &db, 0);
assert_eq!(current_ancient.slot(), slot2);
assert_eq!(current_ancient.append_vec_id(), id);
assert!(is_ancient(&current_ancient.append_vec().accounts));
Expand All @@ -16907,7 +16983,7 @@ pub mod tests {
let _existing_append_vec = db.create_and_insert_store(slot2, 1000, "test");

{
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot2, &db);
let _shrink_in_progress = current_ancient.create_ancient_append_vec(slot2, &db, 0);
}
let id = current_ancient.append_vec_id();
assert_eq!(current_ancient.slot(), slot2);
Expand All @@ -16916,7 +16992,7 @@ pub mod tests {
// there has to be an existing append vec at this slot for a new current ancient at the slot to make sense
let _existing_append_vec = db.create_and_insert_store(slot3, 1000, "test");

let mut _shrink_in_progress = current_ancient.create_ancient_append_vec(slot3, &db);
let mut _shrink_in_progress = current_ancient.create_ancient_append_vec(slot3, &db, 0);
assert_eq!(current_ancient.slot(), slot3);
assert!(is_ancient(&current_ancient.append_vec().accounts));
assert_ne!(current_ancient.append_vec_id(), id);
Expand Down Expand Up @@ -17257,6 +17333,87 @@ pub mod tests {
);
}

#[test]
fn test_shrink_ancient_overflow_with_min_size() {
solana_logger::setup();

let ideal_av_size = ancient_append_vecs::get_ancient_append_vec_capacity();
let num_normal_slots = 2;

// build an ancient append vec at slot 'ancient_slot' with one `fat`
// account that's larger than the ideal size of ancient append vec to
// simulate the *oversized* append vec for shrinking.
let account_size = (1.5 * ideal_av_size as f64) as u64;
let (db, ancient_slot) = get_one_ancient_append_vec_and_others_with_account_size(
true,
num_normal_slots,
Some(account_size),
);

let max_slot_inclusive = ancient_slot + (num_normal_slots as Slot);
let initial_accounts = get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1));

let ancient = db.storage.get_slot_storage_entry(ancient_slot).unwrap();

// assert that the min_size, which about 1.5 * ideal_av_size, kicked in
// and result that the ancient append vec capacity exceeds the ideal_av_size
assert!(ancient.capacity() > ideal_av_size);

// combine 1 normal append vec into existing oversize ancient append vec.
db.combine_ancient_slots(
(ancient_slot..max_slot_inclusive).collect(),
CAN_RANDOMLY_SHRINK_FALSE,
);

compare_all_accounts(
&initial_accounts,
&get_all_accounts(&db, ancient_slot..max_slot_inclusive),
);

// the append vec at max_slot_inclusive-1 should NOT have been removed
// since the append vec is already oversized and we created an ancient
// append vec there.
let ancient2 = db
.storage
.get_slot_storage_entry(max_slot_inclusive - 1)
.unwrap();
assert!(is_ancient(&ancient2.accounts));
assert!(ancient2.capacity() > ideal_av_size); // min_size kicked in, which cause the appendvec to be larger than the ideal_av_size

// Combine normal append vec(s) into existing ancient append vec this
// will overflow the original ancient append vec because of the oversized
// ancient append vec is full.
db.combine_ancient_slots(
(ancient_slot..=max_slot_inclusive).collect(),
CAN_RANDOMLY_SHRINK_FALSE,
);

compare_all_accounts(
&initial_accounts,
&get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1)),
);

// Nothing should be combined because the append vec are oversized.
// min_size kicked in, which cause the appendvecs to be larger than the ideal_av_size.
let ancient = db.storage.get_slot_storage_entry(ancient_slot).unwrap();
assert!(is_ancient(&ancient.accounts));
assert!(ancient.capacity() > ideal_av_size);

let ancient2 = db
.storage
.get_slot_storage_entry(max_slot_inclusive - 1)
.unwrap();
assert!(is_ancient(&ancient2.accounts));
assert!(ancient2.capacity() > ideal_av_size);

let ancient3 = db
.storage
.get_slot_storage_entry(max_slot_inclusive)
.unwrap();
assert!(is_ancient(&ancient3.accounts));
assert!(ancient3.capacity() > ideal_av_size);
}

#[test]
fn test_shrink_ancient_overflow() {
solana_logger::setup();
Expand Down Expand Up @@ -17608,11 +17765,13 @@ pub mod tests {
(db, slot1)
}

fn get_one_ancient_append_vec_and_others(
fn get_one_ancient_append_vec_and_others_with_account_size(
alive: bool,
num_normal_slots: usize,
account_data_size: Option<u64>,
) -> (AccountsDb, Slot) {
let (db, slot1) = create_db_with_storages_and_index(alive, num_normal_slots + 1, None);
let (db, slot1) =
create_db_with_storages_and_index(alive, num_normal_slots + 1, account_data_size);
let storage = db.get_storage_for_slot(slot1).unwrap();
let created_accounts = db.get_unique_accounts_from_storage(&storage);

Expand All @@ -17626,7 +17785,7 @@ pub mod tests {
capacity: after_capacity,
} = db.get_unique_accounts_from_storage(&after_store);
if alive {
assert_ne!(created_accounts.capacity, after_capacity);
assert!(created_accounts.capacity <= after_capacity);
} else {
assert_eq!(created_accounts.capacity, after_capacity);
}
Expand All @@ -17637,6 +17796,13 @@ pub mod tests {
(db, slot1)
}

fn get_one_ancient_append_vec_and_others(
alive: bool,
num_normal_slots: usize,
) -> (AccountsDb, Slot) {
get_one_ancient_append_vec_and_others_with_account_size(alive, num_normal_slots, None)
}

#[test]
fn test_handle_dropped_roots_for_ancient() {
solana_logger::setup();
Expand Down
Loading