Skip to content

Commit

Permalink
Stops ABS in wen_restart when generating a snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo committed Feb 10, 2025
1 parent a1a8344 commit 9dea61e
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 112 deletions.
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,7 @@ impl Validator {
WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
snapshot_config: config.snapshot_config.clone(),
accounts_background_request_sender: accounts_background_request_sender.clone(),
abs_status: accounts_background_service.status().clone(),
genesis_config_hash: genesis_config.hash(),
exit: exit.clone(),
})?;
Expand Down
267 changes: 156 additions & 111 deletions runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ impl AbsRequestHandlers {

pub struct AccountsBackgroundService {
t_background: JoinHandle<()>,
status: AbsStatus,
}

impl AccountsBackgroundService {
Expand All @@ -571,131 +572,141 @@ impl AccountsBackgroundService {
request_handlers: AbsRequestHandlers,
test_hash_calculation: bool,
) -> Self {
let is_running = Arc::new(AtomicBool::new(true));
let stop = Arc::new(AtomicBool::new(false));
let mut last_cleaned_block_height = 0;
let mut removed_slots_count = 0;
let mut total_remove_slots_time = 0;
let t_background = Builder::new()
.name("solBgAccounts".to_string())
.spawn(move || {
info!("AccountsBackgroundService has started");
let mut stats = StatsManager::new();
let mut last_snapshot_end_time = None;

loop {
if exit.load(Ordering::Relaxed) {
break;
}
let start_time = Instant::now();

// Grab the current root bank
let bank = bank_forks.read().unwrap().root_bank();

// Purge accounts of any dead slots
request_handlers
.pruned_banks_request_handler
.remove_dead_slots(
&bank,
&mut removed_slots_count,
&mut total_remove_slots_time,
);

let non_snapshot_time = last_snapshot_end_time
.map(|last_snapshot_end_time: Instant| {
last_snapshot_end_time.elapsed().as_micros()
})
.unwrap_or_default();

// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.

// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
// slot `M` satisfies `M < N`
//
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
// but cleanup has already happened on some slot `M >= N`. Because the call to
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
// then that means in some *previous* iteration of this loop, we must have gotten a root
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
// snapshot request channel.
//
// However, this is impossible because BankForks.set_root() will always flush the snapshot
// request for `N` to the snapshot request channel before setting a root `R > N`, and
// snapshot_request_handler.handle_requests() will always look for the latest
// available snapshot in the channel.
//
// NOTE: We must wait for startup verification to complete before handling
// snapshot requests. This is because startup verification and snapshot
// request handling can both kick off accounts hash calculations in background
// threads, and these must not happen concurrently.
let snapshot_handle_result = bank
.is_startup_verification_complete()
.then(|| {
request_handlers.handle_snapshot_requests(
test_hash_calculation,
non_snapshot_time,
&exit,
)
})
.flatten();
if snapshot_handle_result.is_some() {
last_snapshot_end_time = Some(Instant::now());
}

// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed();

if let Some(snapshot_handle_result) = snapshot_handle_result {
// Safe, see proof above
.spawn({
let is_running = is_running.clone();
let stop = stop.clone();

move || {
info!("AccountsBackgroundService has started");
let mut stats = StatsManager::new();
let mut last_snapshot_end_time = None;

loop {
if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
break;
}
let start_time = Instant::now();

// Grab the current root bank
let bank = bank_forks.read().unwrap().root_bank();

// Purge accounts of any dead slots
request_handlers
.pruned_banks_request_handler
.remove_dead_slots(
&bank,
&mut removed_slots_count,
&mut total_remove_slots_time,
);

let non_snapshot_time = last_snapshot_end_time
.map(|last_snapshot_end_time: Instant| {
last_snapshot_end_time.elapsed().as_micros()
})
.unwrap_or_default();

// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.

// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
// slot `M` satisfies `M < N`
//
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
// but cleanup has already happened on some slot `M >= N`. Because the call to
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
// then that means in some *previous* iteration of this loop, we must have gotten a root
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
// snapshot request channel.
//
// However, this is impossible because BankForks.set_root() will always flush the snapshot
// request for `N` to the snapshot request channel before setting a root `R > N`, and
// snapshot_request_handler.handle_requests() will always look for the latest
// available snapshot in the channel.
//
// NOTE: We must wait for startup verification to complete before handling
// snapshot requests. This is because startup verification and snapshot
// request handling can both kick off accounts hash calculations in background
// threads, and these must not happen concurrently.
let snapshot_handle_result = bank
.is_startup_verification_complete()
.then(|| {
request_handlers.handle_snapshot_requests(
test_hash_calculation,
non_snapshot_time,
&exit,
)
})
.flatten();
if snapshot_handle_result.is_some() {
last_snapshot_end_time = Some(Instant::now());
}

match snapshot_handle_result {
Ok(snapshot_block_height) => {
assert!(last_cleaned_block_height <= snapshot_block_height);
last_cleaned_block_height = snapshot_block_height;
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed();

if let Some(snapshot_handle_result) = snapshot_handle_result {
// Safe, see proof above

match snapshot_handle_result {
Ok(snapshot_block_height) => {
assert!(last_cleaned_block_height <= snapshot_block_height);
last_cleaned_block_height = snapshot_block_height;
}
Err(err) => {
error!("Stopping AccountsBackgroundService! Fatal error while handling snapshot requests: {err}");
exit.store(true, Ordering::Relaxed);
break;
}
}
Err(err) => {
error!("Stopping AccountsBackgroundService! Fatal error while handling snapshot requests: {err}");
exit.store(true, Ordering::Relaxed);
break;
} else {
if bank.block_height() - last_cleaned_block_height
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0..10))
{
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.force_flush_accounts_cache();
bank.clean_accounts();
last_cleaned_block_height = bank.block_height();
// See justification below for why we skip 'shrink' here.
if bank.is_startup_verification_complete() {
bank.shrink_ancient_slots();
}
}
}
} else {
if bank.block_height() - last_cleaned_block_height
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0..10))
{
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.force_flush_accounts_cache();
bank.clean_accounts();
last_cleaned_block_height = bank.block_height();
// See justification below for why we skip 'shrink' here.
// Do not 'shrink' until *after* the startup verification is complete.
// This is because startup verification needs to get the snapshot
// storages *as they existed at startup* (to calculate the accounts hash).
// If 'shrink' were to run, then it is possible startup verification
// (1) could race with 'shrink', and fail to assert that shrinking is not in
// progress, or (2) could get snapshot storages that were newer than what
// was in the snapshot itself.
if bank.is_startup_verification_complete() {
bank.shrink_ancient_slots();
bank.shrink_candidate_slots();
}
}
// Do not 'shrink' until *after* the startup verification is complete.
// This is because startup verification needs to get the snapshot
// storages *as they existed at startup* (to calculate the accounts hash).
// If 'shrink' were to run, then it is possible startup verification
// (1) could race with 'shrink', and fail to assert that shrinking is not in
// progress, or (2) could get snapshot storages that were newer than what
// was in the snapshot itself.
if bank.is_startup_verification_complete() {
bank.shrink_candidate_slots();
}
stats.record_and_maybe_submit(start_time.elapsed());
sleep(Duration::from_millis(INTERVAL_MS));
}
stats.record_and_maybe_submit(start_time.elapsed());
sleep(Duration::from_millis(INTERVAL_MS));
}
info!("AccountsBackgroundService has stopped");
})
info!("AccountsBackgroundService has stopped");
is_running.store(false, Ordering::Relaxed);
}})
.unwrap();

Self { t_background }
Self {
t_background,
status: AbsStatus { is_running, stop },
}
}

/// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
Expand Down Expand Up @@ -724,6 +735,40 @@ impl AccountsBackgroundService {
pub fn join(self) -> thread::Result<()> {
self.t_background.join()
}

/// Returns an object to query/manage the status of ABS
pub fn status(&self) -> &AbsStatus {
&self.status
}
}

/// Query and manage the status of AccountsBackgroundService
#[derive(Debug, Clone)]
pub struct AbsStatus {
/// Flag to query if ABS is running
is_running: Arc<AtomicBool>,
/// Flag to set to stop ABS
stop: Arc<AtomicBool>,
}

impl AbsStatus {
/// Returns if ABS is running
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::Relaxed)
}

/// Raises the flag for ABS to stop
pub fn stop(&self) {
self.stop.store(true, Ordering::Relaxed)
}

#[cfg(feature = "dev-context-only-utils")]
pub fn new_for_tests() -> Self {
Self {
is_running: Arc::new(AtomicBool::new(false)),
stop: Arc::new(AtomicBool::new(false)),
}
}
}

/// Get the AccountsPackageKind from a given SnapshotRequest
Expand Down
Loading

0 comments on commit 9dea61e

Please sign in to comment.