Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Move scheduler out of Bank
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed May 22, 2023
1 parent b44475b commit 81075ac
Show file tree
Hide file tree
Showing 24 changed files with 537 additions and 408 deletions.
2 changes: 1 addition & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ fn main() {
);

assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
Expand Down
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ raptorq = { workspace = true }
serde_json = { workspace = true }
serial_test = { workspace = true }
solana-logger = { workspace = true }
solana-poh = { workspace = true, features = ["test-in-workspace"] }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true, features = ["test-in-workspace"] }
solana-stake-program = { workspace = true }
static_assertions = { workspace = true }
systemstat = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use {
solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{
genesis_config::GenesisConfig,
Expand Down Expand Up @@ -385,6 +386,7 @@ fn simulate_process_entries(
num_accounts: usize,
) {
let bank = Arc::new(Bank::new_for_benches(genesis_config));
let bank = BankWithScheduler::new_without_scheduler(bank);

for i in 0..(num_accounts / 2) {
bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey())
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
Expand Down
20 changes: 10 additions & 10 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ mod tests {
let recorder = poh_recorder.new_recorder();
let poh_recorder = Arc::new(RwLock::new(poh_recorder));

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

Expand Down Expand Up @@ -894,7 +894,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
Expand Down Expand Up @@ -1021,7 +1021,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
Expand Down Expand Up @@ -1093,7 +1093,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
Expand Down Expand Up @@ -1212,7 +1212,7 @@ mod tests {
let recorder = poh_recorder.new_recorder();
let poh_recorder = Arc::new(RwLock::new(poh_recorder));

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

Expand Down Expand Up @@ -1512,7 +1512,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);

let shreds = entries_to_test_shreds(
&entries,
Expand Down Expand Up @@ -1650,7 +1650,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);

let shreds = entries_to_test_shreds(
&entries,
Expand Down Expand Up @@ -1751,7 +1751,7 @@ mod tests {
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
Expand Down Expand Up @@ -1831,7 +1831,7 @@ mod tests {
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
consumer.consume_buffered_packets(
&bank_start,
Expand Down Expand Up @@ -1886,7 +1886,7 @@ mod tests {
// When the working bank in poh_recorder is Some, all packets should be processed
// except except for retryable errors. Manually take the lock of a transaction to
// simulate another thread processing a transaction with that lock.
poh_recorder.write().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank_for_test(&bank);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let lock_account = transactions[0].message.account_keys[1];
Expand Down
38 changes: 26 additions & 12 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
Expand Down Expand Up @@ -1864,14 +1865,14 @@ impl ReplayStage {
poh_recorder
.write()
.unwrap()
.set_bank(&tpu_bank, track_transaction_indexes);
.set_bank(tpu_bank, track_transaction_indexes);
} else {
error!("{} No next leader found", my_pubkey);
}
}

fn replay_blockstore_into_bank(
bank: &Arc<Bank>,
bank: &BankWithScheduler,
blockstore: &Blockstore,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
Expand Down Expand Up @@ -2438,7 +2439,11 @@ impl ReplayStage {
return replay_result;
}

let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap();
let bank = &bank_forks
.read()
.unwrap()
.get_with_scheduler(bank_slot)
.unwrap();
let parent_slot = bank.parent_slot();
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
let stats = progress_lock
Expand Down Expand Up @@ -2524,7 +2529,11 @@ impl ReplayStage {
debug!("bank_slot {:?} is marked dead", bank_slot);
replay_result.is_slot_dead = true;
} else {
let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap();
let bank = &bank_forks
.read()
.unwrap()
.get_with_scheduler(bank_slot)
.unwrap();
let parent_slot = bank.parent_slot();
let prev_leader_slot = progress.get_bank_prev_leader_slot(bank);
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
Expand Down Expand Up @@ -2605,7 +2614,11 @@ impl ReplayStage {
}

let bank_slot = replay_result.bank_slot;
let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap();
let bank = &bank_forks
.read()
.unwrap()
.get_with_scheduler(bank_slot)
.unwrap();
if let Some(replay_result) = &replay_result.replay_result {
match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count,
Expand Down Expand Up @@ -2695,7 +2708,9 @@ impl ReplayStage {
);
// report cost tracker stats
cost_update_sender
.send(CostUpdate::FrozenBank { bank: bank.clone() })
.send(CostUpdate::FrozenBank {
bank: bank.bank_cloned(),
})
.unwrap_or_else(|err| {
warn!("cost_update_sender failed sending bank stats: {:?}", err)
});
Expand Down Expand Up @@ -2734,7 +2749,7 @@ impl ReplayStage {
);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Frozen(bank.clone()))
.send(BankNotification::Frozen(bank.bank_cloned()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
blockstore_processor::cache_block_meta(bank, cache_block_meta_sender);
Expand Down Expand Up @@ -4439,14 +4454,13 @@ pub(crate) mod tests {
assert!(bank0.is_frozen());
assert_eq!(bank0.tick_height(), bank0.max_tick_height());
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap();
let bank1 = bank_forks.write().unwrap().insert(bank1);
let bank1_progress = progress
.entry(bank1.slot())
.or_insert_with(|| ForkProgress::new(bank1.last_blockhash(), None, None, 0, 0));
let shreds = shred_to_insert(
&validator_keypairs.values().next().unwrap().node_keypair,
bank1.clone(),
bank1.bank_cloned(),
);
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
Expand Down Expand Up @@ -4527,7 +4541,7 @@ pub(crate) mod tests {
genesis_config.ticks_per_slot = 4;
let bank0 = Bank::new_for_tests(&genesis_config);
for _ in 0..genesis_config.ticks_per_slot {
bank0.register_tick(&Hash::default());
bank0.register_default_tick_for_test();
}
bank0.freeze();
let arc_bank0 = Arc::new(bank0);
Expand Down Expand Up @@ -4571,7 +4585,7 @@ pub(crate) mod tests {
&solana_sdk::pubkey::new_rand(),
);
for _ in 0..genesis_config.ticks_per_slot {
bank.register_tick(&Hash::default());
bank.register_default_tick_for_test();
}
bank_forks.write().unwrap().insert(bank);
let arc_bank = bank_forks.read().unwrap().get(i).unwrap();
Expand Down
11 changes: 9 additions & 2 deletions core/src/vote_simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
genesis_utils::{
create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
},
installed_scheduler_pool::BankWithScheduler,
},
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signer},
solana_vote_program::vote_transaction,
Expand Down Expand Up @@ -115,7 +116,10 @@ impl VoteSimulator {
}
}
while new_bank.tick_height() < new_bank.max_tick_height() {
new_bank.register_tick(&Hash::new_unique());
new_bank.register_tick(
&Hash::new_unique(),
&BankWithScheduler::no_scheduler_available(),
);
}
if !visit.node().has_no_child() || is_frozen {
new_bank.freeze();
Expand Down Expand Up @@ -358,7 +362,10 @@ pub fn initialize_state(
}

while bank0.tick_height() < bank0.max_tick_height() {
bank0.register_tick(&Hash::new_unique());
bank0.register_tick(
&Hash::new_unique(),
&BankWithScheduler::no_scheduler_available(),
);
}
bank0.freeze();
let mut progress = ProgressMap::default();
Expand Down
6 changes: 3 additions & 3 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ fn goto_end_of_slot(bank: &mut Bank) {
let mut tick_hash = bank.last_blockhash();
loop {
tick_hash = hashv(&[tick_hash.as_ref(), &[42]]);
bank.register_tick(&tick_hash);
bank.register_tick_for_test(&tick_hash);
if tick_hash == bank.last_blockhash() {
bank.freeze();
return;
Expand Down Expand Up @@ -737,7 +737,7 @@ fn test_bank_forks_incremental_snapshot(
assert_eq!(bank.process_transaction(&tx), Ok(()));

while !bank.is_complete() {
bank.register_tick(&Hash::new_unique());
bank.register_unique_tick();
}

bank_forks.insert(bank)
Expand Down Expand Up @@ -1033,7 +1033,7 @@ fn test_snapshots_with_background_services(
assert_eq!(bank.process_transaction(&tx), Ok(()));

while !bank.is_complete() {
bank.register_tick(&Hash::new_unique());
bank.register_unique_tick();
}

bank_forks.write().unwrap().insert(bank);
Expand Down
2 changes: 1 addition & 1 deletion ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3429,7 +3429,7 @@ fn main() {

if child_bank_required {
while !bank.is_complete() {
bank.register_tick(&Hash::new_unique());
bank.register_unique_tick();
}
}

Expand Down
Loading

0 comments on commit 81075ac

Please sign in to comment.