diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 3b333aeb053e46..922faef5b7b8b6 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -538,7 +538,7 @@ fn main() { ); assert!(poh_recorder.read().unwrap().bank().is_none()); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); assert!(poh_recorder.read().unwrap().bank().is_some()); debug!( "new_bank_time: {}us insert_time: {}us poh_time: {}us", diff --git a/core/Cargo.toml b/core/Cargo.toml index 504fcd0c6da874..9cf3bec92591d3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -77,7 +77,9 @@ raptorq = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } solana-logger = { workspace = true } +solana-poh = { workspace = true, features = ["test-in-workspace"] } solana-program-runtime = { workspace = true } +solana-runtime = { workspace = true, features = ["test-in-workspace"] } solana-stake-program = { workspace = true } static_assertions = { workspace = true } systemstat = { workspace = true } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index bcb887e70e09b1..250bb86d1b0eaf 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -30,7 +30,8 @@ use { solana_perf::{packet::to_packet_batches, test_tx::test_tx}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_runtime::{ - bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + bank::Bank, bank_forks::BankForks, installed_scheduler_pool::BankWithScheduler, + prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ genesis_config::GenesisConfig, @@ -385,6 +386,7 @@ fn simulate_process_entries( num_accounts: usize, ) { let bank = Arc::new(Bank::new_for_benches(genesis_config)); + let bank = BankWithScheduler::new_without_scheduler(bank); for i in 0..(num_accounts / 2) { bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey()) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e47f91918646f2..ee4408a2280753 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1018,7 +1018,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let pubkey = solana_sdk::pubkey::new_rand(); let keypair2 = Keypair::new(); let pubkey2 = solana_sdk::pubkey::new_rand(); diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 0d4c1490c6128a..82be3db93f865f 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -747,7 +747,7 @@ mod tests { let recorder = poh_recorder.new_recorder(); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); @@ -894,7 +894,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new( None, @@ -1021,7 +1021,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new( None, @@ -1093,7 +1093,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new( None, @@ -1212,7 +1212,7 @@ mod tests { let recorder = poh_recorder.new_recorder(); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); @@ -1512,7 +1512,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let shreds = entries_to_test_shreds( &entries, @@ -1650,7 +1650,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let shreds = entries_to_test_shreds( &entries, @@ -1751,7 +1751,7 @@ mod tests { assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); // When the working bank in poh_recorder is Some, all packets should be processed. // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); let banking_stage_stats = BankingStageStats::default(); consumer.consume_buffered_packets( @@ -1831,7 +1831,7 @@ mod tests { assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); // When the working bank in poh_recorder is Some, all packets should be processed. // Multi-Iterator will process them 1-by-1 if all txs are conflicting. - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); consumer.consume_buffered_packets( &bank_start, @@ -1886,7 +1886,7 @@ mod tests { // When the working bank in poh_recorder is Some, all packets should be processed // except except for retryable errors. Manually take the lock of a transaction to // simulate another thread processing a transaction with that lock. - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); let lock_account = transactions[0].message.account_keys[1]; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5613cb7e593d4e..7a75d434a3f865 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -60,6 +60,7 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, + installed_scheduler_pool::BankWithScheduler, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, @@ -1864,14 +1865,14 @@ impl ReplayStage { poh_recorder .write() .unwrap() - .set_bank(&tpu_bank, track_transaction_indexes); + .set_bank(tpu_bank, track_transaction_indexes); } else { error!("{} No next leader found", my_pubkey); } } fn replay_blockstore_into_bank( - bank: &Arc, + bank: &BankWithScheduler, blockstore: &Blockstore, replay_stats: &RwLock, replay_progress: &RwLock, @@ -2438,7 +2439,11 @@ impl ReplayStage { return replay_result; } - let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + let bank = &bank_forks + .read() + .unwrap() + .get_with_scheduler(bank_slot) + .unwrap(); let parent_slot = bank.parent_slot(); let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { let stats = progress_lock @@ -2524,7 +2529,11 @@ impl ReplayStage { debug!("bank_slot {:?} is marked dead", bank_slot); replay_result.is_slot_dead = true; } else { - let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + let bank = &bank_forks + .read() + .unwrap() + .get_with_scheduler(bank_slot) + .unwrap(); let parent_slot = bank.parent_slot(); let prev_leader_slot = progress.get_bank_prev_leader_slot(bank); let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { @@ -2605,7 +2614,11 @@ impl ReplayStage { } let bank_slot = replay_result.bank_slot; - let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + let bank = &bank_forks + .read() + .unwrap() + .get_with_scheduler(bank_slot) + .unwrap(); if let Some(replay_result) = &replay_result.replay_result { match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, @@ -2695,7 +2708,9 @@ impl ReplayStage { ); // report cost tracker stats cost_update_sender - .send(CostUpdate::FrozenBank { bank: bank.clone() }) + .send(CostUpdate::FrozenBank { + bank: bank.bank_cloned(), + }) .unwrap_or_else(|err| { warn!("cost_update_sender failed sending bank stats: {:?}", err) }); @@ -2734,7 +2749,7 @@ impl ReplayStage { ); if let Some(sender) = bank_notification_sender { sender - .send(BankNotification::Frozen(bank.clone())) + .send(BankNotification::Frozen(bank.bank_cloned())) .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } blockstore_processor::cache_block_meta(bank, cache_block_meta_sender); @@ -4439,14 +4454,13 @@ pub(crate) mod tests { assert!(bank0.is_frozen()); assert_eq!(bank0.tick_height(), bank0.max_tick_height()); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); - bank_forks.write().unwrap().insert(bank1); - let bank1 = bank_forks.read().unwrap().get(1).unwrap(); + let bank1 = bank_forks.write().unwrap().insert(bank1); let bank1_progress = progress .entry(bank1.slot()) .or_insert_with(|| ForkProgress::new(bank1.last_blockhash(), None, None, 0, 0)); let shreds = shred_to_insert( &validator_keypairs.values().next().unwrap().node_keypair, - bank1.clone(), + bank1.bank_cloned(), ); blockstore.insert_shreds(shreds, None, false).unwrap(); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); @@ -4527,7 +4541,7 @@ pub(crate) mod tests { genesis_config.ticks_per_slot = 4; let bank0 = Bank::new_for_tests(&genesis_config); for _ in 0..genesis_config.ticks_per_slot { - bank0.register_tick(&Hash::default()); + bank0.register_default_tick_for_test(); } bank0.freeze(); let arc_bank0 = Arc::new(bank0); @@ -4571,7 +4585,7 @@ pub(crate) mod tests { &solana_sdk::pubkey::new_rand(), ); for _ in 0..genesis_config.ticks_per_slot { - bank.register_tick(&Hash::default()); + bank.register_default_tick_for_test(); } bank_forks.write().unwrap().insert(bank); let arc_bank = bank_forks.read().unwrap().get(i).unwrap(); diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs index e87bd2dab8ec0b..80673d301532fd 100644 --- a/core/src/vote_simulator.rs +++ b/core/src/vote_simulator.rs @@ -21,6 +21,7 @@ use { genesis_utils::{ create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, }, + installed_scheduler_pool::BankWithScheduler, }, solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signer}, solana_vote_program::vote_transaction, @@ -115,7 +116,10 @@ impl VoteSimulator { } } while new_bank.tick_height() < new_bank.max_tick_height() { - new_bank.register_tick(&Hash::new_unique()); + new_bank.register_tick( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ); } if !visit.node().has_no_child() || is_frozen { new_bank.freeze(); @@ -358,7 +362,10 @@ pub fn initialize_state( } while bank0.tick_height() < bank0.max_tick_height() { - bank0.register_tick(&Hash::new_unique()); + bank0.register_tick( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ); } bank0.freeze(); let mut progress = ProgressMap::default(); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 2a0688c7d8c41b..0bffd7ec5234b2 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -311,7 +311,7 @@ fn goto_end_of_slot(bank: &mut Bank) { let mut tick_hash = bank.last_blockhash(); loop { tick_hash = hashv(&[tick_hash.as_ref(), &[42]]); - bank.register_tick(&tick_hash); + bank.register_tick_for_test(&tick_hash); if tick_hash == bank.last_blockhash() { bank.freeze(); return; @@ -737,7 +737,7 @@ fn test_bank_forks_incremental_snapshot( assert_eq!(bank.process_transaction(&tx), Ok(())); while !bank.is_complete() { - bank.register_tick(&Hash::new_unique()); + bank.register_unique_tick(); } bank_forks.insert(bank) @@ -1033,7 +1033,7 @@ fn test_snapshots_with_background_services( assert_eq!(bank.process_transaction(&tx), Ok(())); while !bank.is_complete() { - bank.register_tick(&Hash::new_unique()); + bank.register_unique_tick(); } bank_forks.write().unwrap().insert(bank); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index f9b9345554bfab..95158302a56737 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -3429,7 +3429,7 @@ fn main() { if child_bank_required { while !bank.is_complete() { - bank.register_tick(&Hash::new_unique()); + bank.register_unique_tick(); } } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 4172c5136fc826..f762c71fbc1b1e 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -32,6 +32,7 @@ use { commitment::VOTE_THRESHOLD_SIZE, cost_model::CostModel, epoch_accounts_hash::EpochAccountsHash, + installed_scheduler_pool::BankWithScheduler, prioritization_fee_cache::PrioritizationFeeCache, rent_debits::RentDebits, runtime_config::RuntimeConfig, @@ -302,7 +303,7 @@ fn execute_batches_internal( } fn process_batches( - bank: &Arc, + bank: &BankWithScheduler, batches: &[TransactionBatchWithIndexes], transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -310,13 +311,13 @@ fn process_batches( log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, ) -> Result<()> { - if !bank.with_scheduler() { + if !bank.has_installed_scheduler() { debug!( "process_batches()/rebatch_and_execute_batches({} batches)", batches.len() ); rebatch_and_execute_batches( - bank, + bank.bank(), batches, transaction_status_sender, replay_vote_sender, @@ -334,7 +335,7 @@ fn process_batches( } fn schedule_batches_for_execution( - bank: &Arc, + bank: &BankWithScheduler, batches: &[TransactionBatchWithIndexes], ) -> Result<()> { for TransactionBatchWithIndexes { @@ -475,7 +476,7 @@ fn rebatch_and_execute_batches( /// This method is for use testing against a single Bank, and assumes `Bank::transaction_count()` /// represents the number of transactions executed in this Bank pub fn process_entries_for_tests( - bank: &Arc, + bank: &BankWithScheduler, entries: Vec, randomize: bool, transaction_status_sender: Option<&TransactionStatusSender>, @@ -521,9 +522,23 @@ pub fn process_entries_for_tests( result } +pub fn process_entries_for_tests_without_scheduler( + bank: &Arc, + entries: Vec, + randomize: bool, +) -> Result<()> { + process_entries_for_tests( + &BankWithScheduler::new_without_scheduler(bank.clone()), + entries, + randomize, + None, + None, + ) +} + // Note: If randomize is true this will shuffle entries' transactions in-place. fn process_entries( - bank: &Arc, + bank: &BankWithScheduler, entries: &mut [ReplayEntry], randomize: bool, transaction_status_sender: Option<&TransactionStatusSender>, @@ -560,7 +575,7 @@ fn process_entries( )?; batches.clear(); for hash in &tick_hashes { - bank.register_tick(hash); + bank.with_scheduler_lock(|scheduler| bank.register_tick(hash, scheduler)); } tick_hashes.clear(); } @@ -636,9 +651,11 @@ fn process_entries( log_messages_bytes_limit, prioritization_fee_cache, )?; - for hash in tick_hashes { - bank.register_tick(hash); - } + bank.with_scheduler_lock(|scheduler| { + for hash in tick_hashes { + bank.register_tick(hash, scheduler) + } + }); Ok(()) } @@ -782,11 +799,16 @@ pub(crate) fn process_blockstore_for_bank_0( accounts_update_notifier, exit, ); + let bank0_slot = bank0.slot(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); info!("Processing ledger for slot 0..."); process_bank_0( - &bank_forks.read().unwrap().root_bank(), + &bank_forks + .read() + .unwrap() + .get_with_scheduler(bank0_slot) + .unwrap(), blockstore, opts, &VerifyRecyclers::default(), @@ -955,7 +977,7 @@ fn verify_ticks( fn confirm_full_slot( blockstore: &Blockstore, - bank: &Arc, + bank: &BankWithScheduler, opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, @@ -1114,7 +1136,7 @@ impl ConfirmationProgress { #[allow(clippy::too_many_arguments)] pub fn confirm_slot( blockstore: &Blockstore, - bank: &Arc, + bank: &BankWithScheduler, timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, skip_verification: bool, @@ -1157,7 +1179,7 @@ pub fn confirm_slot( #[allow(clippy::too_many_arguments)] fn confirm_slot_entries( - bank: &Arc, + bank: &BankWithScheduler, slot_entries_load_result: (Vec, u64, bool), timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, @@ -1340,7 +1362,7 @@ fn confirm_slot_entries( // Special handling required for processing the entries in slot 0 fn process_bank_0( - bank0: &Arc, + bank0: &BankWithScheduler, blockstore: &Blockstore, opts: &ProcessOptions, recyclers: &VerifyRecyclers, @@ -1366,7 +1388,7 @@ fn process_bank_0( if blockstore.is_primary_access() { blockstore.insert_bank_hash(bank0.slot(), bank0.hash(), false); } - cache_block_meta(bank0, cache_block_meta_sender); + cache_block_meta(bank0.bank(), cache_block_meta_sender); } // Given a bank, add its children to the pending slots queue if those children slots are @@ -1539,7 +1561,7 @@ fn load_frozen_forks( // If there's a cluster confirmed root greater than our last // replayed root, then because the cluster confirmed root should // be descended from our last root, it must exist in `all_banks` - let cluster_root_bank = all_banks.get(&supermajority_root).unwrap(); + let cluster_root_bank = all_banks.get(&supermajority_root).unwrap().bank_cloned(); // cluster root must be a descendant of our root, otherwise something // is drastically wrong @@ -1573,7 +1595,7 @@ fn load_frozen_forks( } }) } else if blockstore.is_root(slot) { - Some(&bank) + Some(bank.bank_cloned()) } else { None } @@ -1586,7 +1608,7 @@ fn load_frozen_forks( let mut m = Measure::start("set_root"); root = new_root_bank.slot(); - leader_schedule_cache.set_root(new_root_bank); + leader_schedule_cache.set_root(&new_root_bank); let _ = bank_forks.write().unwrap().set_root( root, accounts_background_request_sender, @@ -1625,7 +1647,7 @@ fn load_frozen_forks( } process_next_slots( - &bank, + bank.bank(), &meta, blockstore, leader_schedule_cache, @@ -1715,7 +1737,7 @@ fn supermajority_root_from_vote_accounts( // if failed to play the slot fn process_single_slot( blockstore: &Blockstore, - bank: &Arc, + bank: &BankWithScheduler, opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, @@ -1759,7 +1781,7 @@ fn process_single_slot( if blockstore.is_primary_access() { blockstore.insert_bank_hash(bank.slot(), bank.hash(), false); } - cache_block_meta(bank, cache_block_meta_sender); + cache_block_meta(bank.bank(), cache_block_meta_sender); Ok(()) } @@ -2651,7 +2673,7 @@ pub mod tests { ); // Now ensure the TX is accepted despite pointing to the ID of an empty entry. - process_entries_for_tests(&bank, slot_entries, true, None, None).unwrap(); + process_entries_for_tests_without_scheduler(&bank, slot_entries, true).unwrap(); assert_eq!(bank.process_transaction(&tx), Ok(())); } @@ -2786,7 +2808,7 @@ pub mod tests { assert_eq!(bank.tick_height(), 0); let tick = next_entry(&genesis_config.hash(), 1, vec![]); assert_eq!( - process_entries_for_tests(&bank, vec![tick], true, None, None), + process_entries_for_tests_without_scheduler(&bank, vec![tick], true), Ok(()) ); assert_eq!(bank.tick_height(), 1); @@ -2821,7 +2843,7 @@ pub mod tests { ); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1, entry_2], true, None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_1, entry_2], true), Ok(()) ); assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); @@ -2877,12 +2899,10 @@ pub mod tests { ); assert_eq!( - process_entries_for_tests( + process_entries_for_tests_without_scheduler( &bank, vec![entry_1_to_mint, entry_2_to_3_mint_to_1], false, - None, - None, ), Ok(()) ); @@ -2949,12 +2969,10 @@ pub mod tests { ], ); - assert!(process_entries_for_tests( + assert!(process_entries_for_tests_without_scheduler( &bank, vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], false, - None, - None, ) .is_err()); @@ -3070,7 +3088,7 @@ pub mod tests { let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]); let bank = Arc::new(bank); - let result = process_entries_for_tests(&bank, vec![entry], false, None, None); + let result = process_entries_for_tests_without_scheduler(&bank, vec![entry], false); bank.freeze(); let blockhash_ok = bank.last_blockhash(); let bankhash_ok = bank.hash(); @@ -3116,7 +3134,7 @@ pub mod tests { let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]); let bank = Arc::new(bank); - let _result = process_entries_for_tests(&bank, vec![entry], false, None, None); + let _result = process_entries_for_tests_without_scheduler(&bank, vec![entry], false); bank.freeze(); assert_eq!(blockhash_ok, bank.last_blockhash()); @@ -3208,7 +3226,7 @@ pub mod tests { // keypair2=3 // keypair3=3 - assert!(process_entries_for_tests( + assert!(process_entries_for_tests_without_scheduler( &bank, vec![ entry_1_to_mint, @@ -3216,8 +3234,6 @@ pub mod tests { entry_conflict_itself, ], false, - None, - None, ) .is_err()); @@ -3265,7 +3281,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1, entry_2], true, None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_1, entry_2], true), Ok(()) ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); @@ -3326,7 +3342,7 @@ pub mod tests { }) .collect(); assert_eq!( - process_entries_for_tests(&bank, entries, true, None, None), + process_entries_for_tests_without_scheduler(&bank, entries, true), Ok(()) ); } @@ -3389,7 +3405,7 @@ pub mod tests { // Transfer lamports to each other let entry = next_entry(&bank.last_blockhash(), 1, tx_vector); assert_eq!( - process_entries_for_tests(&bank, vec![entry], true, None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry], true), Ok(()) ); bank.squash(); @@ -3438,7 +3454,7 @@ pub mod tests { let blockhash = bank.last_blockhash(); while blockhash == bank.last_blockhash() { - bank.register_tick(&Hash::default()); + bank.register_default_tick_for_test(); } // ensure bank can process 2 entries that do not have a common account and tick is registered @@ -3449,12 +3465,10 @@ pub mod tests { system_transaction::transfer(&keypair1, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&tick.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests( + process_entries_for_tests_without_scheduler( &bank, vec![entry_1, tick, entry_2.clone()], true, - None, - None ), Ok(()) ); @@ -3466,7 +3480,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash()); let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_3], true, None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_3], true), Err(TransactionError::AccountNotFound) ); } @@ -3546,7 +3560,7 @@ pub mod tests { ); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1_to_mint], false, None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_1_to_mint], false), Err(TransactionError::AccountInUse) ); @@ -3625,7 +3639,7 @@ pub mod tests { // Set up bank1 let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); - let bank0 = bank_forks.get(0).unwrap(); + let bank0 = bank_forks.get_with_scheduler(0).unwrap(); let opts = ProcessOptions { run_verification: true, accounts_db_test_hash_calculation: true, @@ -3746,7 +3760,8 @@ pub mod tests { }) .collect(); info!("paying iteration {}", i); - process_entries_for_tests(&bank, entries, true, None, None).expect("paying failed"); + process_entries_for_tests_without_scheduler(&bank, entries, true) + .expect("paying failed"); let entries: Vec<_> = (0..NUM_TRANSFERS) .step_by(NUM_TRANSFERS_PER_ENTRY) @@ -3769,17 +3784,16 @@ pub mod tests { .collect(); info!("refunding iteration {}", i); - process_entries_for_tests(&bank, entries, true, None, None).expect("refunding failed"); + process_entries_for_tests_without_scheduler(&bank, entries, true) + .expect("refunding failed"); // advance to next block - process_entries_for_tests( + process_entries_for_tests_without_scheduler( &bank, (0..bank.ticks_per_slot()) .map(|_| next_entry_mut(&mut hash, 1, vec![])) .collect::>(), true, - None, - None, ) .expect("process ticks failed"); @@ -3822,7 +3836,7 @@ pub mod tests { let entry = next_entry(&new_blockhash, 1, vec![tx]); entries.push(entry); - process_entries_for_tests(&bank0, entries, true, None, None).unwrap(); + process_entries_for_tests_without_scheduler(&bank0, entries, true).unwrap(); assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) } @@ -3988,8 +4002,13 @@ pub mod tests { .collect(); let entry = next_entry(&bank_1_blockhash, 1, vote_txs); let (replay_vote_sender, replay_vote_receiver) = crossbeam_channel::unbounded(); - let _ = - process_entries_for_tests(&bank1, vec![entry], true, None, Some(&replay_vote_sender)); + let _ = process_entries_for_tests( + &BankWithScheduler::new_without_scheduler(bank1), + vec![entry], + true, + None, + Some(&replay_vote_sender), + ); let successes: BTreeSet = replay_vote_receiver .try_iter() .map(|(vote_pubkey, ..)| vote_pubkey) @@ -4277,7 +4296,7 @@ pub mod tests { prev_entry_hash: Hash, ) -> result::Result<(), BlockstoreProcessorError> { confirm_slot_entries( - bank, + &BankWithScheduler::new_without_scheduler(bank.clone()), (slot_entries, 0, slot_full), &mut ConfirmationTiming::default(), &mut ConfirmationProgress::new(prev_entry_hash), @@ -4416,7 +4435,9 @@ pub mod tests { .. } = create_genesis_config(100 * LAMPORTS_PER_SOL); let genesis_hash = genesis_config.hash(); - let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let bank = BankWithScheduler::new_without_scheduler(Arc::new(Bank::new_for_tests( + &genesis_config, + ))); let mut timing = ConfirmationTiming::default(); let mut progress = ConfirmationProgress::new(genesis_hash); let amount = genesis_config.rent.minimum_balance(0); @@ -4571,12 +4592,8 @@ pub mod tests { .. } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); - let batch = bank.prepare_sanitized_batch(&txs); - let batch_with_indexes = TransactionBatchWithIndexes { - batch, - transaction_indexes: (0..txs.len()).collect(), - }; let mut mocked_scheduler = MockInstalledScheduler::new(); mocked_scheduler @@ -4595,7 +4612,13 @@ pub mod tests { .returning(|_| ()); Arc::new(mocked_pool) }); - bank.install_scheduler(Box::new(mocked_scheduler)); + let bank = BankWithScheduler::new_for_test(bank, Some(Box::new(mocked_scheduler))); + + let batch = bank.prepare_sanitized_batch(&txs); + let batch_with_indexes = TransactionBatchWithIndexes { + batch, + transaction_indexes: (0..txs.len()).collect(), + }; let mut batch_execution_timing = BatchExecutionTiming::default(); let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); diff --git a/poh/Cargo.toml b/poh/Cargo.toml index 361363364933c5..03a5b103d540be 100644 --- a/poh/Cargo.toml +++ b/poh/Cargo.toml @@ -28,6 +28,10 @@ matches = { workspace = true } rand = { workspace = true } solana-logger = { workspace = true } solana-perf = { workspace = true } +solana-runtime = { workspace = true, features = ["test-in-workspace"] } + +[features] +test-in-workspace = [] [lib] crate-type = ["lib"] diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 3abd9ae57f64ff..1d8de78de0f39b 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -26,7 +26,7 @@ use { }, solana_measure::{measure, measure_us}, solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, - solana_runtime::bank::Bank, + solana_runtime::{bank::Bank, installed_scheduler_pool::BankWithScheduler}, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, saturating_add_assign, transaction::VersionedTransaction, @@ -256,7 +256,7 @@ impl PohRecorderBank { #[derive(Clone)] pub struct WorkingBank { - pub bank: Arc, + pub bank: BankWithScheduler, pub start: Arc, pub min_tick_height: u64, pub max_tick_height: u64, @@ -376,12 +376,12 @@ impl PohRecorder { } pub fn bank(&self) -> Option> { - self.working_bank.as_ref().map(|w| w.bank.clone()) + self.working_bank.as_ref().map(|w| w.bank.bank_cloned()) } pub fn bank_start(&self) -> Option { self.working_bank.as_ref().map(|w| BankStart { - working_bank: w.bank.clone(), + working_bank: w.bank.bank_cloned(), bank_creation_time: w.start.clone(), }) } @@ -573,9 +573,9 @@ impl PohRecorder { self.leader_last_tick_height = leader_last_tick_height; } - pub fn set_bank(&mut self, bank: &Arc, track_transaction_indexes: bool) { + pub fn set_bank(&mut self, bank: BankWithScheduler, track_transaction_indexes: bool) { assert!(self.working_bank.is_none()); - self.leader_bank_notifier.set_in_progress(bank); + self.leader_bank_notifier.set_in_progress(bank.bank()); let working_bank = WorkingBank { bank: bank.clone(), start: Arc::new(Instant::now()), @@ -595,7 +595,7 @@ impl PohRecorder { "resetting poh due to hashes per tick change detected at {}", working_bank.bank.slot() ); - self.reset_poh(working_bank.clone().bank, false); + self.reset_poh(working_bank.clone().bank.bank_cloned(), false); } } self.working_bank = Some(working_bank); @@ -619,6 +619,14 @@ impl PohRecorder { let _ = self.flush_cache(false); } + #[cfg(any(test, feature = "test-in-workspace"))] + pub fn set_bank_for_test(&mut self, bank: &Arc) { + self.set_bank( + BankWithScheduler::new_without_scheduler(bank.clone()), + false, + ) + } + // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height // On a record flush will flush the cache at the WorkingBank::min_tick_height, since a record // occurs after the min_tick_height was generated @@ -654,8 +662,12 @@ impl PohRecorder { ); for tick in &self.tick_cache[..entry_count] { - working_bank.bank.register_tick(&tick.0.hash); - send_result = self.sender.send((working_bank.bank.clone(), tick.clone())); + working_bank.bank.with_scheduler_lock(|scheduler| { + working_bank.bank.register_tick(&tick.0.hash, scheduler) + }); + send_result = self + .sender + .send((working_bank.bank.bank_cloned(), tick.clone())); if send_result.is_err() { break; } @@ -667,7 +679,7 @@ impl PohRecorder { working_bank.max_tick_height, working_bank.bank.slot() ); - self.start_bank = working_bank.bank.clone(); + self.start_bank = working_bank.bank.bank_cloned(); let working_slot = self.start_slot(); self.start_tick_height = working_slot * self.ticks_per_slot + 1; self.clear_bank(); @@ -877,7 +889,7 @@ impl PohRecorder { hash: poh_entry.hash, transactions, }; - let bank_clone = working_bank.bank.clone(); + let bank_clone = working_bank.bank.bank_cloned(); self.sender.send((bank_clone, (entry, self.tick_height))) }, "send_poh_entry", @@ -1057,7 +1069,10 @@ pub fn create_test_recorder( &poh_config, exit.clone(), ); - poh_recorder.set_bank(bank, false); + poh_recorder.set_bank( + BankWithScheduler::new_without_scheduler(bank.clone()), + false, + ); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( @@ -1195,7 +1210,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); assert!(poh_recorder.working_bank.is_some()); poh_recorder.clear_bank(); assert!(poh_recorder.working_bank.is_none()); @@ -1229,7 +1244,7 @@ mod tests { let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); // Set a working bank - poh_recorder.set_bank(&bank1, false); + poh_recorder.set_bank_for_test(&bank1); // Tick until poh_recorder.tick_height == working bank's min_tick_height let num_new_ticks = bank1.tick_height() - poh_recorder.tick_height(); @@ -1298,7 +1313,7 @@ mod tests { ); assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 1); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); poh_recorder.tick(); assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 2); @@ -1339,7 +1354,7 @@ mod tests { bank0.fill_bank_with_ticks_for_tests(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - poh_recorder.set_bank(&bank1, false); + poh_recorder.set_bank_for_test(&bank1); // Let poh_recorder tick up to bank1.tick_height() - 1 for _ in 0..bank1.tick_height() - 1 { poh_recorder.tick() @@ -1380,7 +1395,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); let tx = test_tx(); let h1 = hash(b"hello world!"); @@ -1424,7 +1439,7 @@ mod tests { bank0.fill_bank_with_ticks_for_tests(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - poh_recorder.set_bank(&bank1, false); + poh_recorder.set_bank_for_test(&bank1); // Record up to exactly min tick height let min_tick_height = poh_recorder.working_bank.as_ref().unwrap().min_tick_height; @@ -1478,7 +1493,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); let num_ticks_to_max = bank.max_tick_height() - poh_recorder.tick_height; for _ in 0..num_ticks_to_max { poh_recorder.tick(); @@ -1518,7 +1533,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, true); + poh_recorder.set_bank_for_test(&bank); poh_recorder.tick(); assert_eq!( poh_recorder @@ -1592,7 +1607,7 @@ mod tests { bank0.fill_bank_with_ticks_for_tests(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - poh_recorder.set_bank(&bank1, false); + poh_recorder.set_bank_for_test(&bank1); // Check we can make two ticks without hitting min_tick_height let remaining_ticks_to_min = @@ -1740,7 +1755,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); assert_eq!(bank.slot(), 0); poh_recorder.reset(bank, Some((4, 4))); assert!(poh_recorder.working_bank.is_none()); @@ -1772,7 +1787,7 @@ mod tests { None, Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); poh_recorder.clear_bank(); assert!(receiver.try_recv().is_ok()); } @@ -1807,7 +1822,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); // Simulate ticking much further than working_bank.max_tick_height let max_tick_height = poh_recorder.working_bank.as_ref().unwrap().max_tick_height; @@ -2102,7 +2117,7 @@ mod tests { // Move the bank up a slot (so that max_tick_height > slot 0's tick_height) let bank = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 1)); // If we set the working bank, the node should be leader within next 2 slots - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); assert!(poh_recorder.would_be_leader(2 * bank.ticks_per_slot())); } } @@ -2136,7 +2151,7 @@ mod tests { for _ in 0..(bank.ticks_per_slot() * 3) { poh_recorder.tick(); } - poh_recorder.set_bank(&bank, false); + poh_recorder.set_bank_for_test(&bank); assert!(!bank.is_hash_valid_for_age(&genesis_hash, 0)); assert!(bank.is_hash_valid_for_age(&genesis_hash, 1)); } diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index 9a72ad837bc238..695c5ac6977fa6 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -500,7 +500,7 @@ mod tests { hashes_per_batch, record_receiver, ); - poh_recorder.write().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank_for_test(&bank); // get some events let mut hashes = 0; diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 3d41533aba6920..8e5e6cac06b1e1 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -22,6 +22,7 @@ use { commitment::BlockCommitmentCache, epoch_accounts_hash::EpochAccountsHash, genesis_utils::{create_genesis_config_with_leader_ex, GenesisConfigInfo}, + installed_scheduler_pool::BankWithScheduler, runtime_config::RuntimeConfig, }, solana_sdk::{ @@ -879,7 +880,10 @@ impl ProgramTest { .read() .unwrap() .working_bank() - .register_recent_blockhash(&Hash::new_unique()); + .register_recent_blockhash( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ); } }); @@ -1031,7 +1035,10 @@ impl ProgramTestContext { .read() .unwrap() .working_bank() - .register_recent_blockhash(&Hash::new_unique()); + .register_recent_blockhash( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ); } }), ); @@ -1120,13 +1127,15 @@ impl ProgramTestContext { bank.freeze(); bank } else { - bank_forks.insert(Bank::warp_from_parent( - &bank, - &Pubkey::default(), - pre_warp_slot, - // some warping tests cannot use the append vecs because of the sequence of adding roots and flushing - solana_runtime::accounts_db::CalcAccountsHashDataSource::IndexForTests, - )) + bank_forks + .insert(Bank::warp_from_parent( + &bank, + &Pubkey::default(), + pre_warp_slot, + // some warping tests cannot use the append vecs because of the sequence of adding roots and flushing + solana_runtime::accounts_db::CalcAccountsHashDataSource::IndexForTests, + )) + .bank_cloned() }; let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index d77b4a5c574b04..a437bf7ea95a48 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -48,6 +48,7 @@ use { commitment::{BlockCommitmentArray, BlockCommitmentCache, CommitmentSlots}, inline_spl_token::{SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET}, inline_spl_token_2022::{self, ACCOUNTTYPE_ACCOUNT}, + installed_scheduler_pool::BankWithScheduler, non_circulating_supply::calculate_non_circulating_supply, prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, @@ -4584,7 +4585,7 @@ pub fn populate_blockstore_for_tests( // that they are matched properly by get_rooted_block assert_eq!( solana_ledger::blockstore_processor::process_entries_for_tests( - &bank, + &BankWithScheduler::new_without_scheduler(bank), entries, true, Some( @@ -4936,7 +4937,12 @@ pub mod tests { return; } - let mut parent_bank = self.bank_forks.read().unwrap().working_bank(); + let mut parent_bank = self + .bank_forks + .read() + .unwrap() + .working_bank_with_scheduler() + .clone(); for (i, root) in roots.iter().enumerate() { let new_bank = Bank::new_from_parent(&parent_bank, parent_bank.collector_id(), *root); @@ -4994,7 +5000,7 @@ pub mod tests { CommitmentSlots::new_from_slot(self.bank_forks.read().unwrap().highest_slot()), ); *self.block_commitment_cache.write().unwrap() = new_block_commitment; - bank + bank.bank_cloned() } fn store_vote_account(&self, vote_pubkey: &Pubkey, vote_state: VoteState) { diff --git a/runtime/benches/bank.rs b/runtime/benches/bank.rs index dcb7a0d9148c80..df81a88a573573 100644 --- a/runtime/benches/bank.rs +++ b/runtime/benches/bank.rs @@ -7,7 +7,7 @@ use { log::*, solana_program_runtime::invoke_context::InvokeContext, solana_runtime::{ - bank::{test_utils::goto_end_of_slot, *}, + bank::{test_utils::goto_end_of_slot_without_scheduler, *}, bank_client::BankClient, loader_utils::create_invoke_instruction, }, @@ -189,7 +189,7 @@ fn bench_bank_async_process_native_loader_transactions(bencher: &mut Bencher) { fn bench_bank_update_recent_blockhashes(bencher: &mut Bencher) { let (genesis_config, _mint_keypair) = create_genesis_config(100); let mut bank = Arc::new(Bank::new_for_benches(&genesis_config)); - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); let genesis_hash = bank.last_blockhash(); // Prime blockhash_queue for i in 0..(MAX_RECENT_BLOCKHASHES + 1) { @@ -198,7 +198,7 @@ fn bench_bank_update_recent_blockhashes(bencher: &mut Bencher) { &Pubkey::default(), (i + 1) as u64, )); - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); } // Verify blockhash_queue is full (genesis hash has been kicked out) assert!(!bank.is_hash_valid_for_age(&genesis_hash, MAX_RECENT_BLOCKHASHES)); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d37b3a88b7c744..9ef7faa63c5e3f 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -60,7 +60,7 @@ use { epoch_accounts_hash::{self, EpochAccountsHash}, epoch_stakes::{EpochStakes, NodeVoteAccounts}, inline_spl_associated_token_account, inline_spl_token, - installed_scheduler_pool::InstalledSchedulerBoxInBank, + installed_scheduler_pool::{BankWithScheduler, InstalledSchedulerRwLock}, message_processor::MessageProcessor, rent_collector::{CollectedInfo, RentCollector}, rent_debits::RentDebits, @@ -831,7 +831,6 @@ impl PartialEq for Bank { fee_structure: _, incremental_snapshot_persistence: _, loaded_programs_cache: _, - scheduler: _, // Ignore new fields explicitly if they do not impact PartialEq. // Adding ".." will remove compile-time checks that if a new field // is added to the struct, this PartialEq is accordingly updated. @@ -1101,7 +1100,6 @@ pub struct Bank { /// true when the bank's freezing or destruction has completed bank_freeze_or_destruction_incremented: AtomicBool, - pub(crate) scheduler: RwLock, } struct VoteWithStakeDelegations { @@ -1323,7 +1321,6 @@ impl Bank { accounts_data_size_delta_off_chain: AtomicI64::new(0), fee_structure: FeeStructure::default(), loaded_programs_cache: Arc::>::default(), - scheduler: RwLock::::default(), }; bank.bank_created(); @@ -1475,10 +1472,6 @@ impl Bank { let mut time = Measure::start("bank::new_from_parent"); let NewBankOptions { vote_only_bank } = new_bank_options; - // there should be no active scheduler at this point, which - // might be actively mutating bank state... - assert!(!parent.with_scheduler()); - parent.freeze(); assert_ne!(slot, parent.slot()); @@ -1626,7 +1619,6 @@ impl Bank { accounts_data_size_delta_off_chain: AtomicI64::new(0), fee_structure: parent.fee_structure.clone(), loaded_programs_cache: parent.loaded_programs_cache.clone(), - scheduler: RwLock::::default(), }; let (_, ancestors_time_us) = measure_us!({ @@ -1953,7 +1945,6 @@ impl Bank { accounts_data_size_delta_off_chain: AtomicI64::new(0), fee_structure: FeeStructure::default(), loaded_programs_cache: Arc::>::default(), - scheduler: RwLock::::default(), }; bank.bank_created(); @@ -3682,11 +3673,15 @@ impl Bank { /// Register a new recent blockhash in the bank's recent blockhash queue. Called when a bank /// reaches its max tick height. Can be called by tests to get new blockhashes for transaction /// processing without advancing to a new bank slot. - pub fn register_recent_blockhash(&self, blockhash: &Hash) { + pub fn register_recent_blockhash( + &self, + blockhash: &Hash, + scheduler: &InstalledSchedulerRwLock, + ) { // This is needed until we activate fix_recent_blockhashes because intra-slot // recent_blockhash updates necessitates synchronization for consistent tx check_age // handling. - self.wait_for_reusable_scheduler(); + BankWithScheduler::wait_for_reusable_scheduler(self, scheduler); // Only acquire the write lock for the blockhash queue on block boundaries because // readers can starve this write lock acquisition and ticks would be slowed down too // much if the write lock is acquired for each tick. @@ -3701,7 +3696,7 @@ impl Bank { /// /// This is NOT thread safe because if tick height is updated by two different threads, the /// block boundary condition could be missed. - pub fn register_tick(&self, hash: &Hash) { + pub fn register_tick(&self, hash: &Hash, scheduler: &InstalledSchedulerRwLock) { assert!( !self.freeze_started(), "register_tick() working on a bank that is already frozen or is undergoing freezing!" @@ -3709,7 +3704,7 @@ impl Bank { inc_new_counter_debug!("bank-register_tick-registered", 1); if self.is_block_boundary(self.tick_height.load(Relaxed) + 1) { - self.register_recent_blockhash(hash); + self.register_recent_blockhash(hash, scheduler); } // ReplayStage will start computing the accounts delta hash when it @@ -3720,6 +3715,26 @@ impl Bank { self.tick_height.fetch_add(1, Relaxed); } + #[cfg(any(test, feature = "test-in-workspace"))] + pub fn register_tick_for_test(&self, hash: &Hash) { + self.register_tick(hash, &BankWithScheduler::no_scheduler_available()) + } + + #[cfg(any(test, feature = "test-in-workspace"))] + pub fn register_default_tick_for_test(&self) { + self.register_tick( + &Hash::default(), + &BankWithScheduler::no_scheduler_available(), + ) + } + + pub fn register_unique_tick(&self) { + self.register_tick( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ) + } + pub fn is_complete(&self) -> bool { self.tick_height() == self.max_tick_height() } @@ -7810,7 +7825,10 @@ impl Bank { if self.tick_height.load(Relaxed) < self.max_tick_height { let last_blockhash = self.last_blockhash(); while self.last_blockhash() == last_blockhash { - self.register_tick(&Hash::new_unique()) + self.register_tick( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ) } } else { warn!("Bank already reached max tick height, cannot fill it with more ticks"); @@ -8296,7 +8314,6 @@ impl TotalAccountsStats { impl Drop for Bank { fn drop(&mut self) { - self.drop_scheduler(); self.bank_frozen_or_destroyed(); if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() { drop_callback.callback(self); @@ -8314,14 +8331,19 @@ impl Drop for Bank { pub mod test_utils { use { super::Bank, + crate::installed_scheduler_pool::BankWithScheduler, solana_sdk::{hash::hashv, pubkey::Pubkey}, solana_vote_program::vote_state::{self, BlockTimestamp, VoteStateVersions}, + std::sync::Arc, }; - pub fn goto_end_of_slot(bank: &mut Bank) { + + pub fn goto_end_of_slot(bank: &BankWithScheduler) { let mut tick_hash = bank.last_blockhash(); loop { tick_hash = hashv(&[tick_hash.as_ref(), &[42]]); - bank.register_tick(&tick_hash); + bank.with_scheduler_lock(|scheduler| { + bank.register_tick(&tick_hash, scheduler); + }); if tick_hash == bank.last_blockhash() { bank.freeze(); return; @@ -8329,6 +8351,10 @@ pub mod test_utils { } } + pub fn goto_end_of_slot_without_scheduler(bank: &Arc) { + goto_end_of_slot(&BankWithScheduler::new_without_scheduler(bank.clone())) + } + pub fn update_vote_account_timestamp( timestamp: BlockTimestamp, bank: &Bank, diff --git a/runtime/src/bank/tests.rs b/runtime/src/bank/tests.rs index 802fcd9b8317d1..63a4b9cf4c9183 100644 --- a/runtime/src/bank/tests.rs +++ b/runtime/src/bank/tests.rs @@ -3,7 +3,7 @@ use solana_sdk::sysvar::fees::Fees; use { super::{ - test_utils::{goto_end_of_slot, update_vote_account_timestamp}, + test_utils::{goto_end_of_slot_without_scheduler, update_vote_account_timestamp}, *, }, crate::{ @@ -148,7 +148,7 @@ fn test_race_register_tick_freeze() { let register_tick_thread = Builder::new() .name("register_tick".to_string()) .spawn(move || { - bank0_.register_tick(&hash); + bank0_.register_tick_for_test(&hash); }) .unwrap(); @@ -3113,7 +3113,7 @@ fn test_bank_tx_fee() { let (expected_fee_collected, expected_fee_burned) = genesis_config.fee_rate_governor.burn(expected_fee_paid); - let mut bank = Bank::new_for_tests(&genesis_config); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let capitalization = bank.capitalization(); @@ -3134,7 +3134,7 @@ fn test_bank_tx_fee() { ); assert_eq!(bank.get_balance(&leader), initial_balance); - goto_end_of_slot(&mut bank); + goto_end_of_slot_without_scheduler(&bank); assert_eq!(bank.signature_count(), 1); assert_eq!( bank.get_balance(&leader), @@ -3162,7 +3162,7 @@ fn test_bank_tx_fee() { ); // Verify that an InstructionError collects fees, too - let mut bank = Bank::new_from_parent(&Arc::new(bank), &leader, 1); + let bank = Arc::new(Bank::new_from_parent(&bank, &leader, 1)); let mut tx = system_transaction::transfer(&mint_keypair, &key, 1, bank.last_blockhash()); // Create a bogus instruction to system_program to cause an instruction error tx.message.instructions[0].data[0] = 40; @@ -3174,7 +3174,7 @@ fn test_bank_tx_fee() { bank.get_balance(&mint_keypair.pubkey()), mint - arbitrary_transfer_amount - 2 * expected_fee_paid ); // mint_keypair still pays a fee - goto_end_of_slot(&mut bank); + goto_end_of_slot_without_scheduler(&bank); assert_eq!(bank.signature_count(), 1); // Profit! 2 transaction signatures processed at 3 lamports each @@ -3230,7 +3230,7 @@ fn test_bank_tx_compute_unit_fee() { let (expected_fee_collected, expected_fee_burned) = genesis_config.fee_rate_governor.burn(expected_fee_paid); - let mut bank = Bank::new_for_tests(&genesis_config); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let capitalization = bank.capitalization(); @@ -3250,7 +3250,7 @@ fn test_bank_tx_compute_unit_fee() { ); assert_eq!(bank.get_balance(&leader), initial_balance); - goto_end_of_slot(&mut bank); + goto_end_of_slot_without_scheduler(&bank); assert_eq!(bank.signature_count(), 1); assert_eq!( bank.get_balance(&leader), @@ -3278,7 +3278,7 @@ fn test_bank_tx_compute_unit_fee() { ); // Verify that an InstructionError collects fees, too - let mut bank = Bank::new_from_parent(&Arc::new(bank), &leader, 1); + let bank = Arc::new(Bank::new_from_parent(&bank, &leader, 1)); let mut tx = system_transaction::transfer(&mint_keypair, &key, 1, bank.last_blockhash()); // Create a bogus instruction to system_program to cause an instruction error tx.message.instructions[0].data[0] = 40; @@ -3290,7 +3290,7 @@ fn test_bank_tx_compute_unit_fee() { bank.get_balance(&mint_keypair.pubkey()), mint - arbitrary_transfer_amount - 2 * expected_fee_paid ); // mint_keypair still pays a fee - goto_end_of_slot(&mut bank); + goto_end_of_slot_without_scheduler(&bank); assert_eq!(bank.signature_count(), 1); // Profit! 2 transaction signatures processed at 3 lamports each @@ -3328,19 +3328,19 @@ fn test_bank_blockhash_fee_structure() { .target_lamports_per_signature = 5000; genesis_config.fee_rate_governor.target_signatures_per_slot = 0; - let mut bank = Bank::new_for_tests(&genesis_config); - goto_end_of_slot(&mut bank); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + goto_end_of_slot_without_scheduler(&bank); let cheap_blockhash = bank.last_blockhash(); let cheap_lamports_per_signature = bank.get_lamports_per_signature(); assert_eq!(cheap_lamports_per_signature, 0); - let mut bank = Bank::new_from_parent(&Arc::new(bank), &leader, 1); - goto_end_of_slot(&mut bank); + let bank = Arc::new(Bank::new_from_parent(&bank, &leader, 1)); + goto_end_of_slot_without_scheduler(&bank); let expensive_blockhash = bank.last_blockhash(); let expensive_lamports_per_signature = bank.get_lamports_per_signature(); assert!(cheap_lamports_per_signature < expensive_lamports_per_signature); - let bank = Bank::new_from_parent(&Arc::new(bank), &leader, 2); + let bank = Bank::new_from_parent(&bank, &leader, 2); // Send a transfer using cheap_blockhash let key = solana_sdk::pubkey::new_rand(); @@ -3380,19 +3380,19 @@ fn test_bank_blockhash_compute_unit_fee_structure() { .target_lamports_per_signature = 1000; genesis_config.fee_rate_governor.target_signatures_per_slot = 1; - let mut bank = Bank::new_for_tests(&genesis_config); - goto_end_of_slot(&mut bank); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + goto_end_of_slot_without_scheduler(&bank); let cheap_blockhash = bank.last_blockhash(); let cheap_lamports_per_signature = bank.get_lamports_per_signature(); assert_eq!(cheap_lamports_per_signature, 0); - let mut bank = Bank::new_from_parent(&Arc::new(bank), &leader, 1); - goto_end_of_slot(&mut bank); + let bank = Arc::new(Bank::new_from_parent(&bank, &leader, 1)); + goto_end_of_slot_without_scheduler(&bank); let expensive_blockhash = bank.last_blockhash(); let expensive_lamports_per_signature = bank.get_lamports_per_signature(); assert!(cheap_lamports_per_signature < expensive_lamports_per_signature); - let bank = Bank::new_from_parent(&Arc::new(bank), &leader, 2); + let bank = Bank::new_from_parent(&bank, &leader, 2); // Send a transfer using cheap_blockhash let key = solana_sdk::pubkey::new_rand(); @@ -4606,7 +4606,7 @@ fn test_is_delta_true() { assert!(!bank1.is_delta.load(Relaxed)); assert_ne!(hash1, bank.hash()); // ticks don't make a bank into a delta or change its state unless a block boundary is crossed - bank1.register_tick(&Hash::default()); + bank1.register_default_tick_for_test(); assert!(!bank1.is_delta.load(Relaxed)); assert_eq!(bank1.hash_internal_state(), hash1); } @@ -5268,7 +5268,7 @@ fn test_recent_blockhashes_sysvar() { let most_recent_hash = recent_blockhashes.iter().next().unwrap().blockhash; // Check order assert!(bank.is_hash_valid_for_age(&most_recent_hash, 0)); - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } } @@ -5276,8 +5276,8 @@ fn test_recent_blockhashes_sysvar() { #[allow(deprecated)] #[test] fn test_blockhash_queue_sysvar_consistency() { - let mut bank = create_simple_test_arc_bank(100_000); - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + let bank = create_simple_test_arc_bank(100_000); + goto_end_of_slot_without_scheduler(&bank); let bhq_account = bank.get_account(&sysvar::recent_blockhashes::id()).unwrap(); let recent_blockhashes = @@ -5310,7 +5310,7 @@ fn test_ticks_change_state() { // ticks don't change its state unless a block boundary is crossed for _ in 0..genesis_config.ticks_per_slot { assert_eq!(bank1.hash_internal_state(), hash1); - bank1.register_tick(&Hash::default()); + bank1.register_default_tick_for_test(); } assert_ne!(bank1.hash_internal_state(), hash1); } @@ -5435,7 +5435,7 @@ where // Banks 0 and 1 have no fees, wait two blocks before // initializing our nonce accounts for _ in 0..2 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -5449,7 +5449,7 @@ where // The setup nonce is not valid to be used until the next bank // so wait one more block - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); Ok((bank, mint_keypair, custodian_keypair, nonce_keypair)) @@ -5704,7 +5704,7 @@ fn test_nonce_transaction() { /* Kick nonce hash off the blockhash_queue */ for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -5773,7 +5773,7 @@ fn test_nonce_transaction() { /* Kick nonce hash off the blockhash_queue */ for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -5831,7 +5831,7 @@ fn test_nonce_transaction_with_tx_wide_caps() { /* Kick nonce hash off the blockhash_queue */ for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -5900,7 +5900,7 @@ fn test_nonce_transaction_with_tx_wide_caps() { /* Kick nonce hash off the blockhash_queue */ for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -5968,7 +5968,7 @@ fn test_nonce_authority() { let nonce_hash = get_nonce_blockhash(&bank, &nonce_pubkey).unwrap(); for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -6026,7 +6026,7 @@ fn test_nonce_payer() { let nonce_hash = get_nonce_blockhash(&bank, &nonce_pubkey).unwrap(); for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -6091,7 +6091,7 @@ fn test_nonce_payer_tx_wide_cap() { let nonce_hash = get_nonce_blockhash(&bank, &nonce_pubkey).unwrap(); for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -6159,7 +6159,7 @@ fn test_nonce_fee_calculator_updates() { // Kick nonce hash off the blockhash_queue for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -6227,7 +6227,7 @@ fn test_nonce_fee_calculator_updates_tx_wide_cap() { // Kick nonce hash off the blockhash_queue for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } @@ -6307,7 +6307,7 @@ fn test_check_ro_durable_nonce_fails() { ); // Kick nonce hash off the blockhash_queue for _ in 0..MAX_RECENT_BLOCKHASHES + 1 { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); bank = Arc::new(new_from_parent(&bank)); } // Caught by the runtime because it is a nonce transaction @@ -6918,7 +6918,7 @@ fn test_bank_hash_consistency() { // Check a few slots, cross an epoch boundary assert_eq!(bank.get_slots_in_epoch(0), 32); loop { - goto_end_of_slot(Arc::get_mut(&mut bank).unwrap()); + goto_end_of_slot_without_scheduler(&bank); if bank.slot == 0 { assert_eq!( bank.hash().to_string(), @@ -6989,13 +6989,13 @@ fn get_shrink_account_size() -> usize { // Set root for bank 0, with caching disabled so we can get the size // of the storage for this slot - let mut bank0 = Arc::new(Bank::new_with_config_for_tests( + let bank0 = Arc::new(Bank::new_with_config_for_tests( &genesis_config, AccountSecondaryIndexes::default(), AccountShrinkThreshold::default(), )); bank0.restore_old_behavior_for_fragile_tests(); - goto_end_of_slot(Arc::::get_mut(&mut bank0).unwrap()); + goto_end_of_slot_without_scheduler(&bank0); bank0.freeze(); bank0.squash(); add_root_and_flush_write_cache(&bank0); @@ -7028,7 +7028,7 @@ fn test_clean_nonrooted() { info!("pubkey1: {}", pubkey1); // Set root for bank 0, with caching enabled - let mut bank0 = Arc::new(Bank::new_with_config_for_tests( + let bank0 = Arc::new(Bank::new_with_config_for_tests( &genesis_config, AccountSecondaryIndexes::default(), AccountShrinkThreshold::default(), @@ -7036,7 +7036,7 @@ fn test_clean_nonrooted() { let account_zero = AccountSharedData::new(0, 0, &Pubkey::new_unique()); - goto_end_of_slot(Arc::::get_mut(&mut bank0).unwrap()); + goto_end_of_slot_without_scheduler(&bank0); bank0.freeze(); bank0.squash(); // Flush now so that accounts cache cleaning doesn't clean up bank 0 when later @@ -7045,9 +7045,9 @@ fn test_clean_nonrooted() { // Store some lamports in bank 1 let some_lamports = 123; - let mut bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); bank1.deposit(&pubkey0, some_lamports).unwrap(); - goto_end_of_slot(Arc::::get_mut(&mut bank1).unwrap()); + goto_end_of_slot_without_scheduler(&bank1); bank1.freeze(); bank1.flush_accounts_cache_slot_for_tests(); @@ -7055,10 +7055,10 @@ fn test_clean_nonrooted() { // Store some lamports for pubkey1 in bank 2, root bank 2 // bank2's parent is bank0 - let mut bank2 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 2)); + let bank2 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 2)); bank2.deposit(&pubkey1, some_lamports).unwrap(); bank2.store_account(&pubkey0, &account_zero); - goto_end_of_slot(Arc::::get_mut(&mut bank2).unwrap()); + goto_end_of_slot_without_scheduler(&bank2); bank2.freeze(); bank2.squash(); bank2.force_flush_accounts_cache(); @@ -7070,9 +7070,9 @@ fn test_clean_nonrooted() { // candidate set bank2.clean_accounts_for_tests(); - let mut bank3 = Arc::new(Bank::new_from_parent(&bank2, &Pubkey::default(), 3)); + let bank3 = Arc::new(Bank::new_from_parent(&bank2, &Pubkey::default(), 3)); bank3.deposit(&pubkey1, some_lamports + 1).unwrap(); - goto_end_of_slot(Arc::::get_mut(&mut bank3).unwrap()); + goto_end_of_slot_without_scheduler(&bank3); bank3.freeze(); bank3.squash(); bank3.force_flush_accounts_cache(); @@ -7103,7 +7103,7 @@ fn test_shrink_candidate_slots_cached() { let pubkey2 = solana_sdk::pubkey::new_rand(); // Set root for bank 0, with caching enabled - let mut bank0 = Arc::new(Bank::new_with_config_for_tests( + let bank0 = Arc::new(Bank::new_with_config_for_tests( &genesis_config, AccountSecondaryIndexes::default(), AccountShrinkThreshold::default(), @@ -7115,7 +7115,7 @@ fn test_shrink_candidate_slots_cached() { let account0 = AccountSharedData::new(1000, pubkey0_size, &Pubkey::new_unique()); bank0.store_account(&pubkey0, &account0); - goto_end_of_slot(Arc::::get_mut(&mut bank0).unwrap()); + goto_end_of_slot_without_scheduler(&bank0); bank0.freeze(); bank0.squash(); // Flush now so that accounts cache cleaning doesn't clean up bank 0 when later @@ -7124,10 +7124,10 @@ fn test_shrink_candidate_slots_cached() { // Store some lamports in bank 1 let some_lamports = 123; - let mut bank1 = Arc::new(new_from_parent(&bank0)); + let bank1 = Arc::new(new_from_parent(&bank0)); bank1.deposit(&pubkey1, some_lamports).unwrap(); bank1.deposit(&pubkey2, some_lamports).unwrap(); - goto_end_of_slot(Arc::::get_mut(&mut bank1).unwrap()); + goto_end_of_slot_without_scheduler(&bank1); bank1.freeze(); bank1.squash(); // Flush now so that accounts cache cleaning doesn't clean up bank 0 when later @@ -7135,10 +7135,10 @@ fn test_shrink_candidate_slots_cached() { bank1.force_flush_accounts_cache(); // Store some lamports for pubkey1 in bank 2, root bank 2 - let mut bank2 = Arc::new(new_from_parent(&bank1)); + let bank2 = Arc::new(new_from_parent(&bank1)); bank2.deposit(&pubkey1, some_lamports).unwrap(); bank2.store_account(&pubkey0, &account0); - goto_end_of_slot(Arc::::get_mut(&mut bank2).unwrap()); + goto_end_of_slot_without_scheduler(&bank2); bank2.freeze(); bank2.squash(); bank2.force_flush_accounts_cache(); @@ -12754,6 +12754,7 @@ fn test_runtime_feature_enable_with_executor_cache() { let mut root_bank = Bank::new_for_tests(&genesis_config); let (name, id, entrypoint) = solana_bpf_loader_program!(); root_bank.add_builtin(&name, &id, entrypoint); + let root_bank = Arc::new(root_bank); // Test a basic transfer let amount = genesis_config.rent.minimum_balance(0); @@ -12782,8 +12783,8 @@ fn test_runtime_feature_enable_with_executor_cache() { let transaction1 = Transaction::new(&signers1, message1, root_bank.last_blockhash()); // Advance the bank so the next transaction can be submitted. - goto_end_of_slot(&mut root_bank); - let mut bank = new_from_parent(&Arc::new(root_bank)); + goto_end_of_slot_without_scheduler(&root_bank); + let mut bank = new_from_parent(&root_bank); // Compose second instruction using the same program with a different block hash let instruction2 = Instruction::new_with_bytes(program_keypair.pubkey(), &[], Vec::new()); diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index d68945b7a0d1cc..6b53307d0065a8 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -5,12 +5,15 @@ use { accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestType}, bank::{Bank, SquashTiming}, epoch_accounts_hash, - installed_scheduler_pool::{BankWithScheduler, InstalledSchedulerPoolArc}, + installed_scheduler_pool::{ + BankWithScheduler, InstalledSchedulerPoolArc, SchedulingContext, + }, snapshot_config::SnapshotConfig, }, log::*, solana_measure::measure::Measure, solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph, WorkingSlot}, + solana_scheduler::SchedulingMode, solana_sdk::{clock::Slot, feature_set, hash::Hash, timing}, std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -136,7 +139,11 @@ impl BankForks { } pub fn get(&self, bank_slot: Slot) -> Option> { - self.banks.get(&bank_slot).map(|b| b.bank_cloned()) + self.get_with_scheduler(bank_slot).map(|b| b.bank_cloned()) + } + + pub fn get_with_scheduler(&self, bank_slot: Slot) -> Option { + self.banks.get(&bank_slot).cloned() } pub fn get_with_checked_hash( @@ -163,11 +170,11 @@ impl BankForks { // Iterate through the heads of all the different forks for bank in initial_forks { - banks.insert(bank.slot(), BankWithScheduler::new(bank.clone())); + banks.insert(bank.slot(), BankWithScheduler::new(bank.clone(), None)); let parents = bank.parents(); for parent in parents { if banks - .insert(parent.slot(), BankWithScheduler::new(parent.clone())) + .insert(parent.slot(), BankWithScheduler::new(parent.clone(), None)) .is_some() { // All ancestors have already been inserted by another fork @@ -194,19 +201,23 @@ impl BankForks { } } - pub fn insert(&mut self, bank: Bank) -> Arc { + pub fn insert(&mut self, bank: Bank) -> BankWithScheduler { let bank = Arc::new(bank); - let prev = self - .banks - .insert(bank.slot(), BankWithScheduler::new(bank.clone())); + let bank_with_scheduler = if let Some(scheduler_pool) = &self.scheduler_pool { + let context = SchedulingContext::new(SchedulingMode::BlockVerification, bank.clone()); + let scheduler = scheduler_pool.take_from_pool(context); + BankWithScheduler::new(bank.clone(), Some(scheduler)) + } else { + BankWithScheduler::new(bank.clone(), None) + }; + let prev = self.banks.insert(bank.slot(), bank_with_scheduler.clone()); assert!(prev.is_none()); let slot = bank.slot(); self.descendants.entry(slot).or_default(); for parent in bank.proper_ancestors() { self.descendants.entry(parent).or_default().insert(slot); } - self.install_scheduler_into_bank(&bank); - bank + bank_with_scheduler } pub fn remove(&mut self, slot: Slot) -> Option> { @@ -239,6 +250,10 @@ impl BankForks { self[self.highest_slot()].clone() } + pub fn working_bank_with_scheduler(&self) -> &BankWithScheduler { + &self.banks[&self.highest_slot()] + } + fn do_set_root_return_metrics( &mut self, root: Slot, @@ -701,7 +716,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let mut bank_forks = BankForks::new(bank); let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1); - child_bank.register_tick(&Hash::default()); + child_bank.register_default_tick_for_test(); bank_forks.insert(child_bank); assert_eq!(bank_forks[1u64].tick_height(), 1); assert_eq!(bank_forks.working_bank().tick_height(), 1); @@ -884,9 +899,10 @@ mod tests { #[test] fn test_bank_forks_with_set_root() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); - let mut banks = vec![Arc::new(Bank::new_for_tests(&genesis_config))]; - assert_eq!(banks[0].slot(), 0); - let mut bank_forks = BankForks::new_from_banks(&banks, 0); + let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); + assert_eq!(bank0.slot(), 0); + let mut bank_forks = BankForks::new_from_banks(&[bank0], 0); + let mut banks = vec![bank_forks.get_with_scheduler(0).unwrap()]; banks.push(bank_forks.insert(Bank::new_from_parent(&banks[0], &Pubkey::default(), 1))); banks.push(bank_forks.insert(Bank::new_from_parent(&banks[1], &Pubkey::default(), 2))); banks.push(bank_forks.insert(Bank::new_from_parent(&banks[0], &Pubkey::default(), 3))); @@ -943,9 +959,10 @@ mod tests { #[test] fn test_bank_forks_with_highest_confirmed_root() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); - let mut banks = vec![Arc::new(Bank::new_for_tests(&genesis_config))]; - assert_eq!(banks[0].slot(), 0); - let mut bank_forks = BankForks::new_from_banks(&banks, 0); + let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); + assert_eq!(bank0.slot(), 0); + let mut bank_forks = BankForks::new_from_banks(&[bank0], 0); + let mut banks = vec![bank_forks.get_with_scheduler(0).unwrap()]; banks.push(bank_forks.insert(Bank::new_from_parent(&banks[0], &Pubkey::default(), 1))); banks.push(bank_forks.insert(Bank::new_from_parent(&banks[1], &Pubkey::default(), 2))); banks.push(bank_forks.insert(Bank::new_from_parent(&banks[0], &Pubkey::default(), 3))); diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 1a26cab98ec573..ab16b3bed526a0 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -34,7 +34,12 @@ use { slot_history::Slot, transaction::{Result, SanitizedTransaction}, }, - std::{borrow::Borrow, fmt::Debug, ops::Deref, sync::Arc}, + std::{ + borrow::Borrow, + fmt::Debug, + ops::Deref, + sync::{Arc, RwLock}, + }, }; // Send + Sync is needed to be a field of BankForks @@ -109,12 +114,6 @@ pub trait InstalledScheduler: Send + Sync + Debug { // Calling this is illegal as soon as schedule_termiantion is called on &self. fn schedule_execution<'a>(&'a self, transaction_with_index: SEA::TransactionWithIndex<'a>); - // This optionally signals scheduling termination request to the scheduler. - // This is subtle but important, to break circular dependency of Arc => Scheduler => - // SchedulingContext => Arc in the middle of the tear-down process, otherwise it would - // prevent Bank::drop()'s last resort scheduling termination attempt indefinitely - fn schedule_termination(&mut self); - #[must_use] fn wait_for_termination(&mut self, reason: &WaitReason) -> Option; @@ -161,26 +160,12 @@ pub enum WaitReason { // most normal termination waiting mode; couldn't be done implicitly inside Bank::freeze() -> () to return // the result and timing in some way to higher-layer subsystems; TerminatedToFreeze, - TerminatedFromBankDrop, - TerminatedInternallyByScheduler, + DroppedFromBankForks, // scheduler will be restarted without being returned to pool in order to reuse it immediately. ReinitializedForRecentBlockhash, } pub type InstalledSchedulerBox = Box>; -// somewhat arbitrary new type just to pacify Bank's frozen_abi... -#[derive(Debug, Default)] -pub(crate) struct InstalledSchedulerBoxInBank(Option); - -#[cfg(RUSTC_WITH_SPECIALIZATION)] -use solana_frozen_abi::abi_example::AbiExample; - -#[cfg(RUSTC_WITH_SPECIALIZATION)] -impl AbiExample for InstalledSchedulerBoxInBank { - fn example() -> Self { - Self(None) - } -} #[derive(Clone, Debug)] pub struct SchedulingContext { @@ -222,76 +207,74 @@ impl SchedulingContext { } } -// tiny wrapper to ensure to call schedule_termination() via ::drop() inside +// tiny wrapper to ensure to call wait_for_termination() via ::drop() inside // BankForks::set_root()'s pruning. -pub(crate) struct BankWithScheduler(Arc); +#[derive(Clone)] +pub struct BankWithScheduler { + inner: Arc, +} -impl BankWithScheduler { - pub(crate) fn new(bank: Arc) -> Self { - Self(bank) - } +#[derive(Debug)] +pub struct BankWithSchedulerInner { + bank: Arc, + scheduler: InstalledSchedulerRwLock, +} +pub type InstalledSchedulerRwLock = RwLock>; - pub(crate) fn bank_cloned(&self) -> Arc { - self.0.clone() - } +#[allow(clippy::declare_interior_mutable_const)] +pub const NO_INSTALLED_SCHEDULER_RW_LOCK: InstalledSchedulerRwLock = RwLock::new(None); - pub(crate) fn bank(&self) -> &Arc { - &self.0 +impl BankWithScheduler { + pub(crate) fn new(bank: Arc, scheduler: Option) -> Self { + Self { + inner: Arc::new(BankWithSchedulerInner { + bank, + scheduler: RwLock::new(scheduler), + }), + } } - pub(crate) fn into_bank(self) -> Arc { - let bank = self.bank_cloned(); - drop(self); - bank + #[cfg(any(test, feature = "test-in-workspace"))] + pub fn new_for_test(bank: Arc, scheduler: Option) -> Self { + Self::new(bank, scheduler) } -} -impl Drop for BankWithScheduler { - fn drop(&mut self) { - self.0.schedule_termination(); + pub fn new_without_scheduler(bank: Arc) -> Self { + Self::new(bank, None) } -} -impl Deref for BankWithScheduler { - type Target = Bank; + pub fn bank_cloned(&self) -> Arc { + self.bank().clone() + } - fn deref(&self) -> &Self::Target { - &self.0 + pub fn bank(&self) -> &Arc { + &self.inner.bank } -} -impl BankForks { - pub fn install_scheduler_pool(&mut self, pool: InstalledSchedulerPoolArc) { - info!("Installed new scheduler_pool into bank_forks: {:?}", pool); - assert!(self.scheduler_pool.replace(pool).is_none()); + // don't indefintely lend out scheduler refs to avoid accidental mix because scheduler should + // strictly tied to bank + pub fn with_scheduler_lock(&self, callback: impl FnOnce(&InstalledSchedulerRwLock)) { + callback(&self.inner.scheduler) } - pub(crate) fn install_scheduler_into_bank(&self, bank: &Arc) { - if let Some(scheduler_pool) = &self.scheduler_pool { - let context = SchedulingContext::new(SchedulingMode::BlockVerification, bank.clone()); - bank.install_scheduler(scheduler_pool.take_from_pool(context)); - } + pub fn has_installed_scheduler(&self) -> bool { + self.inner.scheduler.read().unwrap().is_some() } -} -impl Bank { - pub fn install_scheduler(&self, scheduler: InstalledSchedulerBox) { - let mut scheduler_guard = self.scheduler.write().expect("not poisoned"); - assert!(scheduler_guard.0.replace(scheduler).is_none()); + pub(crate) fn into_bank(self) -> Arc { + let bank = self.bank_cloned(); + drop(self); + bank } - pub fn with_scheduler(&self) -> bool { - self.scheduler.read().expect("not poisoned").0.is_some() + #[must_use] + pub fn wait_for_completed_scheduler(&self) -> Option { + self.inner + .wait_for_scheduler(WaitReason::TerminatedToFreeze) } - pub fn with_scheduling_context(&self) -> bool { - self.scheduler - .read() - .expect("not poisoned") - .0 - .as_ref() - .and_then(|scheduler| scheduler.context()) - .is_some() + pub(crate) fn wait_for_reusable_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) { + BankWithSchedulerInner::wait_for_reusable_scheduler(bank, scheduler); } pub fn schedule_transaction_executions<'a>( @@ -304,31 +287,42 @@ impl Bank { transactions.len() ); - let scheduler_guard = self.scheduler.read().expect("not poisoned"); - let scheduler = scheduler_guard.0.as_ref().expect("active scheduler"); + let scheduler_guard = self.inner.scheduler.read().unwrap(); + let scheduler = scheduler_guard.as_ref().unwrap(); for (sanitized_transaction, &index) in transactions.iter().zip(transaction_indexes) { scheduler.schedule_execution(&(sanitized_transaction, index)); } } - fn schedule_termination(&self) { - let mut scheduler_guard = self.scheduler.write().expect("not poisoned"); - if let Some(scheduler) = scheduler_guard.0.as_mut() { - scheduler.schedule_termination(); - } + pub const fn no_scheduler_available() -> InstalledSchedulerRwLock { + NO_INSTALLED_SCHEDULER_RW_LOCK } + #[cfg(any(test, feature = "test-in-workspace"))] + pub fn drop_scheduler(&self) { + self.inner.drop_scheduler(); + } +} + +impl BankWithSchedulerInner { #[must_use] fn wait_for_scheduler(&self, reason: WaitReason) -> Option { + Self::do_wait_for_scheduler(&self.bank, &self.scheduler, reason) + } + + #[must_use] + fn do_wait_for_scheduler( + bank: &Bank, + scheduler: &InstalledSchedulerRwLock, + reason: WaitReason, + ) -> Option { debug!( "wait_for_scheduler(slot: {}, reason: {reason:?}): started...", - self.slot() + bank.slot() ); - let mut scheduler_guard = self.scheduler.write().expect("not poisoned"); - let scheduler = &mut scheduler_guard.0; - + let mut scheduler = scheduler.write().unwrap(); let result_with_timings = if scheduler.is_some() { let result_with_timings = scheduler .as_mut() @@ -343,48 +337,60 @@ impl Bank { }; debug!( "wait_for_scheduler(slot: {}, reason: {reason:?}): finished with: {:?}...", - self.slot(), + bank.slot(), result_with_timings.as_ref().map(|(result, _)| result) ); result_with_timings } - #[must_use] - pub fn wait_for_completed_scheduler(&self) -> Option { - self.wait_for_scheduler(WaitReason::TerminatedToFreeze) - } - #[must_use] fn wait_for_completed_scheduler_from_drop(&self) -> Option> { - let maybe_timings_and_result = self.wait_for_scheduler(WaitReason::TerminatedFromBankDrop); + let maybe_timings_and_result = self.wait_for_scheduler(WaitReason::DroppedFromBankForks); maybe_timings_and_result.map(|(result, _timings)| result) } - pub fn wait_for_completed_scheduler_from_scheduler_drop(self) { - let maybe_timings_and_result = - self.wait_for_scheduler(WaitReason::TerminatedInternallyByScheduler); - assert!(maybe_timings_and_result.is_some()); - } - - pub(crate) fn wait_for_reusable_scheduler(&self) { - let maybe_timings_and_result = - self.wait_for_scheduler(WaitReason::ReinitializedForRecentBlockhash); + fn wait_for_reusable_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) { + let maybe_timings_and_result = Self::do_wait_for_scheduler( + bank, + scheduler, + WaitReason::ReinitializedForRecentBlockhash, + ); assert!(maybe_timings_and_result.is_none()); } - pub(crate) fn drop_scheduler(&mut self) { - if self.with_scheduler() { - if let Some(Err(err)) = self.wait_for_completed_scheduler_from_drop() { - warn!( - "Bank::drop(): slot: {} discarding error from scheduler: {err:?}", - self.slot(), - ); - } + fn drop_scheduler(&self) { + if let Some(Err(err)) = self.wait_for_completed_scheduler_from_drop() { + warn!( + "BankWithSchedulerInner::drop(): slot: {} discarding error from scheduler: {:?}", + self.bank.slot(), + err, + ); } } } +impl Drop for BankWithSchedulerInner { + fn drop(&mut self) { + self.drop_scheduler(); + } +} + +impl Deref for BankWithScheduler { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.inner.bank + } +} + +impl BankForks { + pub fn install_scheduler_pool(&mut self, pool: InstalledSchedulerPoolArc) { + info!("Installed new scheduler_pool into bank_forks: {:?}", pool); + assert!(self.scheduler_pool.replace(pool).is_none()); + } +} + #[cfg(test)] mod tests { use { @@ -462,10 +468,13 @@ mod tests { fn test_scheduler_normal_termination() { solana_logger::setup(); - let bank = Bank::default_for_tests(); - bank.install_scheduler(setup_mocked_scheduler( - [WaitReason::TerminatedToFreeze].into_iter(), - )); + let bank = Arc::new(Bank::default_for_tests()); + let bank = BankWithScheduler::new_for_test( + bank, + Some(setup_mocked_scheduler( + [WaitReason::TerminatedToFreeze].into_iter(), + )), + ); assert!(bank.wait_for_completed_scheduler().is_none()); } @@ -473,10 +482,13 @@ mod tests { fn test_scheduler_termination_from_drop() { solana_logger::setup(); - let bank = Bank::default_for_tests(); - bank.install_scheduler(setup_mocked_scheduler( - [WaitReason::TerminatedFromBankDrop].into_iter(), - )); + let bank = Arc::new(Bank::default_for_tests()); + let bank = BankWithScheduler::new_for_test( + bank, + Some(setup_mocked_scheduler( + [WaitReason::DroppedFromBankForks].into_iter(), + )), + ); drop(bank); } @@ -484,15 +496,18 @@ mod tests { fn test_scheduler_reinitialization() { solana_logger::setup(); - let mut bank = crate::bank::tests::create_simple_test_bank(42); - bank.install_scheduler(setup_mocked_scheduler( - [ - WaitReason::ReinitializedForRecentBlockhash, - WaitReason::TerminatedFromBankDrop, - ] - .into_iter(), - )); - goto_end_of_slot(&mut bank); + let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42)); + let bank = BankWithScheduler::new_for_test( + bank, + Some(setup_mocked_scheduler( + [ + WaitReason::ReinitializedForRecentBlockhash, + WaitReason::DroppedFromBankForks, + ] + .into_iter(), + )), + ); + goto_end_of_slot(&bank); } #[test] @@ -510,9 +525,9 @@ mod tests { 2, genesis_config.hash(), )); - let bank = &Arc::new(Bank::new_for_tests(&genesis_config)); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let mocked_scheduler = setup_mocked_scheduler_with_extra( - [WaitReason::TerminatedFromBankDrop].into_iter(), + [WaitReason::DroppedFromBankForks].into_iter(), Some( |mocked: &mut MockInstalledScheduler| { mocked @@ -523,7 +538,7 @@ mod tests { ), ); - bank.install_scheduler(mocked_scheduler); + let bank = BankWithScheduler::new_for_test(bank, Some(mocked_scheduler)); bank.schedule_transaction_executions(&[tx0], [0].iter()); } } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 78350bff301675..3cd931cb302351 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -4195,7 +4195,7 @@ mod tests { let original_bank = Bank::new_for_tests(&genesis_config); while !original_bank.is_complete() { - original_bank.register_tick(&Hash::new_unique()); + original_bank.register_unique_tick(); } let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests(); @@ -4265,7 +4265,7 @@ mod tests { .transfer(sol_to_lamports(3.), &mint_keypair, &key3.pubkey()) .unwrap(); while !bank0.is_complete() { - bank0.register_tick(&Hash::new_unique()); + bank0.register_unique_tick(); } let slot = 1; @@ -4280,7 +4280,7 @@ mod tests { .transfer(sol_to_lamports(5.), &mint_keypair, &key5.pubkey()) .unwrap(); while !bank1.is_complete() { - bank1.register_tick(&Hash::new_unique()); + bank1.register_unique_tick(); } let slot = slot + 1; @@ -4289,7 +4289,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank2.is_complete() { - bank2.register_tick(&Hash::new_unique()); + bank2.register_unique_tick(); } let slot = slot + 1; @@ -4298,7 +4298,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank3.is_complete() { - bank3.register_tick(&Hash::new_unique()); + bank3.register_unique_tick(); } let slot = slot + 1; @@ -4307,7 +4307,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank4.is_complete() { - bank4.register_tick(&Hash::new_unique()); + bank4.register_unique_tick(); } let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests(); @@ -4383,7 +4383,7 @@ mod tests { .transfer(sol_to_lamports(3.), &mint_keypair, &key3.pubkey()) .unwrap(); while !bank0.is_complete() { - bank0.register_tick(&Hash::new_unique()); + bank0.register_unique_tick(); } let slot = 1; @@ -4398,7 +4398,7 @@ mod tests { .transfer(sol_to_lamports(5.), &mint_keypair, &key5.pubkey()) .unwrap(); while !bank1.is_complete() { - bank1.register_tick(&Hash::new_unique()); + bank1.register_unique_tick(); } let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests(); @@ -4426,7 +4426,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank2.is_complete() { - bank2.register_tick(&Hash::new_unique()); + bank2.register_unique_tick(); } let slot = slot + 1; @@ -4435,7 +4435,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank3.is_complete() { - bank3.register_tick(&Hash::new_unique()); + bank3.register_unique_tick(); } let slot = slot + 1; @@ -4444,7 +4444,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank4.is_complete() { - bank4.register_tick(&Hash::new_unique()); + bank4.register_unique_tick(); } let incremental_snapshot_archive_info = bank_to_incremental_snapshot_archive( @@ -4505,7 +4505,7 @@ mod tests { .transfer(sol_to_lamports(3.), &mint_keypair, &key3.pubkey()) .unwrap(); while !bank0.is_complete() { - bank0.register_tick(&Hash::new_unique()); + bank0.register_unique_tick(); } let slot = 1; @@ -4520,7 +4520,7 @@ mod tests { .transfer(sol_to_lamports(3.), &mint_keypair, &key3.pubkey()) .unwrap(); while !bank1.is_complete() { - bank1.register_tick(&Hash::new_unique()); + bank1.register_unique_tick(); } let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests(); @@ -4548,7 +4548,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank2.is_complete() { - bank2.register_tick(&Hash::new_unique()); + bank2.register_unique_tick(); } let slot = slot + 1; @@ -4557,7 +4557,7 @@ mod tests { .transfer(sol_to_lamports(2.), &mint_keypair, &key2.pubkey()) .unwrap(); while !bank3.is_complete() { - bank3.register_tick(&Hash::new_unique()); + bank3.register_unique_tick(); } let slot = slot + 1; @@ -4566,7 +4566,7 @@ mod tests { .transfer(sol_to_lamports(3.), &mint_keypair, &key3.pubkey()) .unwrap(); while !bank4.is_complete() { - bank4.register_tick(&Hash::new_unique()); + bank4.register_unique_tick(); } bank_to_incremental_snapshot_archive( @@ -4656,7 +4656,7 @@ mod tests { .transfer(lamports_to_transfer, &mint_keypair, &key2.pubkey()) .unwrap(); while !bank0.is_complete() { - bank0.register_tick(&Hash::new_unique()); + bank0.register_unique_tick(); } let slot = 1; @@ -4665,7 +4665,7 @@ mod tests { .transfer(lamports_to_transfer, &key2, &key1.pubkey()) .unwrap(); while !bank1.is_complete() { - bank1.register_tick(&Hash::new_unique()); + bank1.register_unique_tick(); } let full_snapshot_slot = slot; @@ -4704,7 +4704,7 @@ mod tests { "Ensure Account1's balance is zero" ); while !bank2.is_complete() { - bank2.register_tick(&Hash::new_unique()); + bank2.register_unique_tick(); } // Take an incremental snapshot and then do a roundtrip on the bank and ensure it @@ -4754,13 +4754,13 @@ mod tests { .transfer(lamports_to_transfer, &mint_keypair, &key2.pubkey()) .unwrap(); while !bank3.is_complete() { - bank3.register_tick(&Hash::new_unique()); + bank3.register_unique_tick(); } let slot = slot + 1; let bank4 = Arc::new(Bank::new_from_parent(&bank3, &collector, slot)); while !bank4.is_complete() { - bank4.register_tick(&Hash::new_unique()); + bank4.register_unique_tick(); } // Ensure account1 has been cleaned/purged from everywhere @@ -4828,13 +4828,13 @@ mod tests { let (genesis_config, mint_keypair) = create_genesis_config(sol_to_lamports(1_000_000.)); let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); while !bank0.is_complete() { - bank0.register_tick(&Hash::new_unique()); + bank0.register_unique_tick(); } let slot = 1; let bank1 = Arc::new(Bank::new_from_parent(&bank0, &collector, slot)); while !bank1.is_complete() { - bank1.register_tick(&Hash::new_unique()); + bank1.register_unique_tick(); } let all_snapshots_dir = tempfile::TempDir::new().unwrap(); @@ -4859,7 +4859,7 @@ mod tests { .transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey()) .unwrap(); while !bank2.is_complete() { - bank2.register_tick(&Hash::new_unique()); + bank2.register_unique_tick(); } bank_to_incremental_snapshot_archive( diff --git a/scheduler-pool/Cargo.toml b/scheduler-pool/Cargo.toml index 03fbdeaea00f0b..8393c84e0a1426 100644 --- a/scheduler-pool/Cargo.toml +++ b/scheduler-pool/Cargo.toml @@ -23,3 +23,4 @@ crossbeam-channel = { workspace = true } jemallocator = { workspace = true } log = { workspace = true } solana-logger = { workspace = true } +solana-runtime = { workspace = true, features = ["test-in-workspace"] } diff --git a/scheduler-pool/benches/scheduler.rs b/scheduler-pool/benches/scheduler.rs index ddbb3216d87d8a..7b143f0dffb363 100644 --- a/scheduler-pool/benches/scheduler.rs +++ b/scheduler-pool/benches/scheduler.rs @@ -300,10 +300,6 @@ mod nonblocking { .unwrap(); } - fn schedule_termination(&mut self) { - unimplemented!("not needed for this bench"); - } - fn wait_for_termination(&mut self, _wait_reason: &WaitReason) -> Option { let (next_transaction_sender, next_transaction_receiver) = crossbeam_channel::unbounded::(); diff --git a/scheduler-pool/src/lib.rs b/scheduler-pool/src/lib.rs index 0a3c48b3ffd173..0eb7566f423406 100644 --- a/scheduler-pool/src/lib.rs +++ b/scheduler-pool/src/lib.rs @@ -86,8 +86,6 @@ impl SchedulerPool { impl InstalledSchedulerPool for SchedulerPool { fn take_from_pool(&self, context: SchedulingContext) -> InstalledSchedulerBox { - assert!(!context.bank().with_scheduler()); - let mut schedulers = self.schedulers.lock().expect("not poisoned"); // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently @@ -214,22 +212,13 @@ impl, SEA: ScheduleExecutionArg> InstalledS } } - fn schedule_termination(&mut self) { - drop::>(self.context.take()); - } - fn wait_for_termination(&mut self, wait_reason: &WaitReason) -> Option { let keep_result_with_timings = match wait_reason { - WaitReason::ReinitializedForRecentBlockhash => { - // rustfmt... - true - } - WaitReason::TerminatedToFreeze - | WaitReason::TerminatedFromBankDrop - | WaitReason::TerminatedInternallyByScheduler => false, + WaitReason::ReinitializedForRecentBlockhash => true, + WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false, }; - self.schedule_termination(); + drop::>(self.context.take()); // current simplest form of this trait impl doesn't block the current thread materially // just with the following single mutex lock. Suppose more elaborated synchronization @@ -265,7 +254,7 @@ mod tests { bank::Bank, bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, - installed_scheduler_pool::SchedulingContext, + installed_scheduler_pool::{BankWithScheduler, SchedulingContext}, prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ @@ -409,17 +398,19 @@ mod tests { // existing banks in bank_forks shouldn't process transactions anymore in general, so // shouldn't be touched - assert!(!bank_forks.working_bank().with_scheduler()); + assert!(!bank_forks + .working_bank_with_scheduler() + .has_installed_scheduler()); bank_forks.install_scheduler_pool(pool); - assert!(!bank_forks.working_bank().with_scheduler()); + assert!(!bank_forks + .working_bank_with_scheduler() + .has_installed_scheduler()); - assert!(!child_bank.with_scheduler()); - bank_forks.insert(child_bank); - let child_bank = bank_forks.working_bank(); - assert!(child_bank.with_scheduler()); - assert!(child_bank.with_scheduling_context()); + let child_bank = bank_forks.insert(child_bank); + assert!(child_bank.has_installed_scheduler()); bank_forks.remove(child_bank.slot()); - assert!(!child_bank.with_scheduling_context()); + child_bank.drop_scheduler(); + assert!(!child_bank.has_installed_scheduler()); } #[test] @@ -437,7 +428,7 @@ mod tests { 2, genesis_config.hash(), )); - let bank = &Arc::new(Bank::new_for_tests(&genesis_config)); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::new_dyn(None, None, None, _ignored_prioritization_fee_cache); let context = SchedulingContext::new(SchedulingMode::BlockVerification, bank.clone()); @@ -445,9 +436,9 @@ mod tests { assert_eq!(bank.transaction_count(), 0); let scheduler = pool.take_from_pool(context); scheduler.schedule_execution(&(tx0, 0)); - assert_eq!(bank.transaction_count(), 1); - bank.install_scheduler(scheduler); + let bank = BankWithScheduler::new_for_test(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 1); } #[test] @@ -466,7 +457,7 @@ mod tests { 2, genesis_config.hash(), )); - let bank = &Arc::new(Bank::new_for_tests(&genesis_config)); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::new_dyn(None, None, None, _ignored_prioritization_fee_cache); let context = SchedulingContext::new(SchedulingMode::BlockVerification, bank.clone()); @@ -490,7 +481,7 @@ mod tests { // transaction_count should remain same as scheduler should be bailing out. assert_eq!(bank.transaction_count(), 0); - bank.install_scheduler(scheduler); + let bank = BankWithScheduler::new_for_test(bank.clone(), Some(scheduler)); assert_matches!( bank.wait_for_completed_scheduler(), Some((