Skip to content

Commit

Permalink
Add headers first validation to simpa
Browse files Browse the repository at this point in the history
  • Loading branch information
someone235 committed Dec 26, 2023
1 parent 10d0ebf commit 1ac721c
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions simpa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ struct Args {
#[arg(short, long)]
virtual_threads: Option<usize>,

#[arg(short, long, default_value_t = false)]
no_headers_first: bool,

/// Logging level for all subsystems {off, error, warn, info, debug, trace}
/// -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems
#[arg(long = "loglevel", default_value = format!("info,{}=trace", env!("CARGO_PKG_NAME")))]
Expand Down Expand Up @@ -242,7 +245,10 @@ fn main_impl(mut args: Args) {
unix_now(),
));
let handles2 = consensus2.run_processors();
rt.block_on(validate(&consensus, &consensus2, &config, args.delay, args.bps));
if !args.no_headers_first {
rt.block_on(validate(&consensus, &consensus2, &config, args.delay, args.bps, true));
}
rt.block_on(validate(&consensus, &consensus2, &config, args.delay, args.bps, false));
consensus2.shutdown(handles2);
if let Some(stop_perf_monitor) = stop_perf_monitor {
_ = rt.block_on(stop_perf_monitor);
Expand Down Expand Up @@ -311,34 +317,48 @@ fn apply_args_to_perf_params(args: &Args, perf_params: &mut PerfParams) {
}
}

async fn validate(src_consensus: &Consensus, dst_consensus: &Consensus, params: &Params, delay: f64, bps: f64) {
async fn validate(src_consensus: &Consensus, dst_consensus: &Consensus, params: &Params, delay: f64, bps: f64, header_only: bool) {
let hashes = topologically_ordered_hashes(src_consensus, params.genesis.hash);
let num_blocks = hashes.len();
let num_txs = print_stats(src_consensus, &hashes, delay, bps, params.ghostdag_k);
info!("Validating {num_blocks} blocks with {num_txs} transactions overall...");
if header_only {
info!("Validating {num_blocks} headers...");
} else {
info!("Validating {num_blocks} blocks with {num_txs} transactions overall...");
}

let start = std::time::Instant::now();
let chunks = hashes.into_iter().chunks(1000);
let mut iter = chunks.into_iter();
let mut chunk = iter.next().unwrap();
let mut prev_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk);
let mut prev_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk, header_only);

for (i, mut chunk) in iter.enumerate() {
let current_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk);
let current_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk, header_only);
let statuses = try_join_all(prev_joins).await.unwrap();
trace!("Validated chunk {}", i);
assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending()));
if header_only {
assert!(statuses.iter().all(|s| s.is_header_only()));
} else {
assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending()));
}
prev_joins = current_joins;
}

let statuses = try_join_all(prev_joins).await.unwrap();
assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending()));
if header_only {
assert!(statuses.iter().all(|s| s.is_header_only()));
} else {
assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending()));
}

// Assert that at least one body tip was resolved with valid UTXO
assert!(dst_consensus.body_tips().iter().copied().any(|h| dst_consensus.block_status(h) == BlockStatus::StatusUTXOValid));
let elapsed = start.elapsed();
info!(
"Total validation time: {:?}, block processing rate: {:.2} (b/s), transaction processing rate: {:.2} (t/s)",
"Total validation time: {:?}, {} processing rate: {:.2} (b/s), transaction processing rate: {:.2} (t/s)",
elapsed,
if header_only { "header" } else { "block" },
num_blocks as f64 / elapsed.as_secs_f64(),
num_txs as f64 / elapsed.as_secs_f64(),
);
Expand All @@ -348,12 +368,13 @@ fn submit_chunk(
src_consensus: &Consensus,
dst_consensus: &Consensus,
chunk: &mut impl Iterator<Item = Hash>,
header_only: bool,
) -> Vec<impl Future<Output = BlockProcessResult<BlockStatus>>> {
let mut futures = Vec::new();
for hash in chunk {
let block = Block::from_arcs(
src_consensus.headers_store.get_header(hash).unwrap(),
src_consensus.block_transactions_store.get(hash).unwrap(),
if header_only { Default::default() } else { src_consensus.block_transactions_store.get(hash).unwrap() },
);
let f = dst_consensus.validate_and_insert_block(block).virtual_state_task;
futures.push(f);
Expand Down

0 comments on commit 1ac721c

Please sign in to comment.