Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Add new execution code-path for unified scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Apr 18, 2023
1 parent e74bc4e commit 3597dfb
Show file tree
Hide file tree
Showing 24 changed files with 1,459 additions and 71 deletions.
61 changes: 61 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ members = [
"rpc-test",
"runtime",
"runtime/store-tool",
"scheduler",
"scheduler-pool",
"sdk",
"sdk/cargo-build-bpf",
"sdk/cargo-build-sbf",
Expand Down Expand Up @@ -231,6 +233,7 @@ memmap2 = "0.5.10"
memoffset = "0.8"
merlin = "3"
min-max-heap = "1.3.0"
mockall = "0.11.4"
modular-bitfield = "0.11.2"
nix = "0.25.1"
num-bigint = "0.4.3"
Expand Down Expand Up @@ -340,6 +343,8 @@ solana-rpc-client = { path = "rpc-client", version = "=1.16.0", default-features
solana-rpc-client-api = { path = "rpc-client-api", version = "=1.16.0" }
solana-rpc-client-nonce-utils = { path = "rpc-client-nonce-utils", version = "=1.16.0" }
solana-runtime = { path = "runtime", version = "=1.16.0" }
solana-scheduler = { path = "scheduler", version = "=1.16.0" }
solana-scheduler-pool = { path = "scheduler-pool", version = "=1.16.0" }
solana-sdk = { path = "sdk", version = "=1.16.0" }
solana-sdk-macro = { path = "sdk/macro", version = "=1.16.0" }
solana-send-transaction-service = { path = "send-transaction-service", version = "=1.16.0" }
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ solana-rayon-threadlimit = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-runtime = { workspace = true }
solana-scheduler-pool = { workspace = true }
solana-sdk = { workspace = true }
solana-send-transaction-service = { workspace = true }
solana-streamer = { workspace = true }
Expand Down
42 changes: 36 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use {
block_error::BlockError,
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
self, BlockstoreProcessorError, ConfirmationProgress, ExecuteBatchesInternalMetrics,
TransactionStatusSender,
},
leader_schedule_cache::LeaderScheduleCache,
leader_schedule_utils::first_of_consecutive_leader_slots,
Expand Down Expand Up @@ -2609,7 +2610,6 @@ impl ReplayStage {
match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count,
Err(err) => {
// Error means the slot needs to be marked as dead
Self::mark_dead_slot(
blockstore,
bank,
Expand Down Expand Up @@ -2640,14 +2640,44 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");

let replay_stats = bank_progress.replay_stats.clone();
let r_replay_stats = replay_stats.read().unwrap();
let mut replay_stats = replay_stats.write().unwrap();

if let Some((result, complete_execute_timings)) =
bank.wait_for_completed_scheduler()
{
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
complete_execute_timings,
);
replay_stats.batch_execute.accumulate(metrics);

if let Err(err) = result {
Self::mark_dead_slot(
blockstore,
bank,
bank_forks.read().unwrap().root(),
&BlockstoreProcessorError::InvalidTransaction(err),
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
// If the bank was corrupted, abort now to prevent further normal processing
continue;
}
}

let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
debug!(
"bank {} has completed replay from blockstore, \
contribute to update cost with {:?}",
bank.slot(),
r_replay_stats.batch_execute.totals
replay_stats.batch_execute.totals
);
did_complete_bank = true;
let _ = cluster_slots_update_sender.send(vec![bank_slot]);
Expand Down Expand Up @@ -2735,14 +2765,14 @@ impl ReplayStage {
}
bank_complete_time.stop();

r_replay_stats.report_stats(
replay_stats.report_stats(
bank.slot(),
r_replay_progress.num_txs,
r_replay_progress.num_entries,
r_replay_progress.num_shreds,
bank_complete_time.as_us(),
);
execute_timings.accumulate(&r_replay_stats.batch_execute.totals);
execute_timings.accumulate(&replay_stats.batch_execute.totals);
} else {
trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}",
Expand Down
31 changes: 26 additions & 5 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use {
snapshot_hash::StartingSnapshotHashes,
snapshot_utils::{self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path},
},
solana_scheduler_pool::SchedulerPool,
solana_sdk::{
clock::Slot,
epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
Expand Down Expand Up @@ -129,6 +130,7 @@ const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
pub enum BlockVerificationMethod {
#[default]
BlockstoreProcessor,
UnifiedScheduler,
}

impl BlockVerificationMethod {
Expand Down Expand Up @@ -747,6 +749,30 @@ impl Validator {
config.block_verification_method, config.block_production_method
);

let (replay_vote_sender, replay_vote_receiver) = unbounded();

// block min prioritization fee cache should be readable by RPC, and writable by validator
// (by both replay stage and banking stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());

match &config.block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("not installing scheduler pool...");
}
BlockVerificationMethod::UnifiedScheduler => {
let scheduler_pool = SchedulerPool::new_dyn(
config.runtime_config.log_messages_bytes_limit,
transaction_status_sender.clone(),
Some(replay_vote_sender.clone()),
prioritization_fee_cache.clone(),
);
bank_forks
.write()
.unwrap()
.install_scheduler_pool(scheduler_pool);
}
}

let leader_schedule_cache = Arc::new(leader_schedule_cache);
let mut process_blockstore = ProcessBlockStore::new(
&id,
Expand Down Expand Up @@ -866,10 +892,6 @@ impl Validator {
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
};

// block min prioritization fee cache should be readable by RPC, and writable by validator
// (by both replay stage and banking stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());

let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let (
json_rpc_service,
Expand Down Expand Up @@ -1068,7 +1090,6 @@ impl Validator {
info!("Disabled banking tracer");
}

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ solana-logger = { workspace = true }
solana-measure = { workspace = true }
solana-rpc = { workspace = true }
solana-runtime = { workspace = true }
solana-scheduler-pool = { workspace = true }
solana-sdk = { workspace = true }
solana-stake-program = { workspace = true }
solana-storage-bigtable = { workspace = true }
Expand Down
22 changes: 22 additions & 0 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use {
cost_model::CostModel,
cost_tracker::CostTracker,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
prioritization_fee_cache::PrioritizationFeeCache,
runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
Expand All @@ -78,7 +79,9 @@ use {
move_and_async_delete_path, ArchiveFormat, SnapshotVersion,
DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION,
},
vote_sender_types::ReplayVoteSender,
},
solana_scheduler_pool::SchedulerPool,
solana_sdk::{
account::{AccountSharedData, ReadableAccount, WritableAccount},
account_utils::StateMut,
Expand Down Expand Up @@ -1260,6 +1263,25 @@ fn load_bank_forks(
"Using: block-verification-method: {}",
block_verification_method,
);
match block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("not installing scheduler pool...");
}
BlockVerificationMethod::UnifiedScheduler => {
let no_transaction_status_sender = None;
let no_replay_vote_sender = None::<ReplayVoteSender>;
let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
bank_forks
.write()
.unwrap()
.install_scheduler_pool(SchedulerPool::new_dyn(
process_options.runtime_config.log_messages_bytes_limit,
no_transaction_status_sender,
no_replay_vote_sender,
_ignored_prioritization_fee_cache,
));
}
}

let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
Expand Down
1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ bs58 = { workspace = true }
matches = { workspace = true }
solana-account-decoder = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["test-in-workspace"] }
test-case = { workspace = true }

[build-dependencies]
Expand Down
Loading

0 comments on commit 3597dfb

Please sign in to comment.