diff --git a/Cargo.lock b/Cargo.lock index 1102206f0fc..279b1ac2716 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,7 +1243,9 @@ dependencies = [ name = "congestion-model" version = "0.0.0" dependencies = [ + "chrono", "clap", + "csv", ] [[package]] diff --git a/tools/congestion-model/Cargo.toml b/tools/congestion-model/Cargo.toml index d700f63afe7..d0c035b4cdf 100644 --- a/tools/congestion-model/Cargo.toml +++ b/tools/congestion-model/Cargo.toml @@ -9,7 +9,9 @@ license.workspace = true publish = false [dependencies] +chrono.workspace = true clap = { workspace = true, features = ["derive"] } +csv.workspace = true [lints] workspace = true diff --git a/tools/congestion-model/src/evaluation/mod.rs b/tools/congestion-model/src/evaluation/mod.rs index bf3d2227145..62467a3c8fe 100644 --- a/tools/congestion-model/src/evaluation/mod.rs +++ b/tools/congestion-model/src/evaluation/mod.rs @@ -1,3 +1,4 @@ +use chrono::{Duration, Utc}; pub use transaction_progress::TransactionStatus; use crate::{GGas, Model, ShardId}; @@ -26,6 +27,9 @@ pub struct Progress { pub failed_transactions: usize, } +// The stats writer can be used to dump stats into a CSV file. +pub type StatsWriter = Option>>; + impl Model { pub fn queue_lengths(&self) -> HashMap { let mut out = HashMap::new(); @@ -72,4 +76,53 @@ impl Model { failed_transactions, } } + + pub fn write_stats_header(&self, stats_writer: &mut StatsWriter) { + let Some(stats_writer) = stats_writer else { return }; + + stats_writer.write_field("time").unwrap(); + stats_writer.write_field("round").unwrap(); + + stats_writer.write_field("finished_transactions").unwrap(); + stats_writer.write_field("pending_transactions").unwrap(); + stats_writer.write_field("waiting_transactions").unwrap(); + stats_writer.write_field("failed_transactions").unwrap(); + + for shard_id in self.shard_ids.clone() { + for queue in self.queues.shard_queues(shard_id) { + let field_name = format!("shard_{}_queue_{}", shard_id, queue.name()); + stats_writer.write_field(field_name).unwrap(); + } + } + stats_writer.write_record(None::<&[u8]>).unwrap(); + } + + pub fn write_stats_values( + &self, + stats_writer: &mut StatsWriter, + start_time: chrono::prelude::DateTime, + round: usize, + ) { + let Some(stats_writer) = stats_writer else { return }; + + let time = start_time + Duration::seconds(round as i64); + + stats_writer.write_field(format!("{:?}", time)).unwrap(); + stats_writer.write_field(format!("{}", round)).unwrap(); + + // This is slow and takes up to 10s for the slowest workloads and strategies. + let progress = self.progress(); + stats_writer.write_field(format!("{}", progress.finished_transactions)).unwrap(); + stats_writer.write_field(format!("{}", progress.pending_transactions)).unwrap(); + stats_writer.write_field(format!("{}", progress.waiting_transactions)).unwrap(); + stats_writer.write_field(format!("{}", progress.failed_transactions)).unwrap(); + + for shard_id in self.shard_ids.clone() { + for queue in self.queues.shard_queues(shard_id) { + stats_writer.write_field(format!("{}", queue.len())).unwrap(); + } + } + + stats_writer.write_record(None::<&[u8]>).unwrap(); + } } diff --git a/tools/congestion-model/src/lib.rs b/tools/congestion-model/src/lib.rs index ae3b1597863..d293a060a1d 100644 --- a/tools/congestion-model/src/lib.rs +++ b/tools/congestion-model/src/lib.rs @@ -3,7 +3,7 @@ mod model; pub mod strategy; pub mod workload; -pub use evaluation::{summary_table, TransactionStatus}; +pub use evaluation::{summary_table, StatsWriter, TransactionStatus}; pub use model::{Model, Queue, QueueId, Receipt, ShardId, TransactionId}; pub use strategy::CongestionStrategy; pub use workload::{ReceiptDefinition, ReceiptId, TransactionBuilder}; diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 9009add222f..52b37d28836 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -1,8 +1,11 @@ +use chrono::Utc; +use std::time::Duration; + use congestion_model::strategy::{GlobalTxStopShard, NewTxLast, NoQueueShard, SimpleBackpressure}; use congestion_model::workload::{ AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer, }; -use congestion_model::{summary_table, CongestionStrategy, Model, PGAS}; +use congestion_model::{summary_table, CongestionStrategy, Model, StatsWriter, PGAS}; use clap::Parser; @@ -23,6 +26,20 @@ struct Args { /// Can be used to select a single strategy or "all" to run all strategies. #[clap(long, default_value = "all")] strategy: String, + + /// If enabled the model will write stats into a csv file that can be used + /// to visualize the evaluation of the model over time. + #[clap(long, default_value = "false")] + write_stats: bool, + + /// Optional path the the file where the stats should be saved. By default + /// the stats will be saved to a file name with prefix "stats", the strategy + /// and workload name concatenated and ".csv" extension. This option can + /// only be used when a single strategy and a single workflow are selected + /// otherwise the stats from different evaluations would overwrite each + /// other. + #[clap(long)] + write_stats_filepath: Option, } fn main() { @@ -30,21 +47,66 @@ fn main() { summary_table::print_summary_header(); - let workload_names = parse_workflow_names(args.workload.as_ref()); + let workload_names = parse_workload_names(args.workload.as_ref()); let strategy_names = parse_strategy_names(args.strategy.as_ref()); + if args.write_stats_filepath.is_some() + && (workload_names.len() != 1 || strategy_names.len() != 1) + { + panic!("write_stats_filepath can only be used with single workload and strategy. Parsed {:?} workloads and {:?} strategies. ", workload_names, strategy_names); + } + for workload_name in &workload_names { for strategy_name in &strategy_names { - run_model(&strategy_name, &workload_name, args.shards, args.rounds); + let stats_writer = parse_stats_writer( + args.write_stats, + args.write_stats_filepath.clone(), + workload_name, + strategy_name, + ); + + run_model(&strategy_name, &workload_name, args.shards, args.rounds, stats_writer); } } } -fn run_model(strategy_name: &str, workload_name: &str, num_shards: usize, num_rounds: usize) { +fn parse_stats_writer( + write_stats: bool, + write_stats_filepath: Option, + workload_name: &String, + strategy_name: &String, +) -> StatsWriter { + if !write_stats { + return None; + } + + let default_path = format!("stats_{}_{}.csv", workload_name, strategy_name); + let path = write_stats_filepath.unwrap_or(default_path); + let stats_writer = Box::new(csv::Writer::from_path(path).unwrap()); + Some(stats_writer) +} + +fn run_model( + strategy_name: &str, + workload_name: &str, + num_shards: usize, + num_rounds: usize, + mut stats_writer: StatsWriter, +) { let strategy = strategy(strategy_name, num_shards); let workload = workload(workload_name); let mut model = Model::new(strategy, workload); - for _ in 0..num_rounds { + + // Set the start time to an half hour ago to make it visible by default in + // grafana. Each round is 1 virtual second so hald an hour is good for + // looking at a maximum of 1800 rounds, beyond that you'll need to customize + // the grafana time range. + let start_time = Utc::now() - Duration::from_secs(1 * 60 * 60); + + model.write_stats_header(&mut stats_writer); + + for round in 0..num_rounds { + model.write_stats_values(&mut stats_writer, start_time, round); model.step(); } summary_table::print_summary_row(&model, workload_name, strategy_name); @@ -82,20 +144,20 @@ fn strategy(strategy_name: &str, num_shards: usize) -> Vec Vec { +fn parse_workload_names(workload_name: &str) -> Vec { let available: Vec = vec!["Balanced".to_string(), "All To One".to_string(), "Linear Imbalance".to_string()]; - if workflow_name == "all" { + if workload_name == "all" { return available; } for name in &available { - if normalize_cmdline_arg(name.as_ref()) == normalize_cmdline_arg(workflow_name) { + if normalize_cmdline_arg(name.as_ref()) == normalize_cmdline_arg(workload_name) { return vec![name.to_string()]; } } - panic!("The requested workflow name did not match any available workflows. Requested workflow name {:?}, The available workflows are: {:?}", workflow_name, available); + panic!("The requested workload name did not match any available workloads. Requested workload name {:?}, The available workloads are: {:?}", workload_name, available); } fn parse_strategy_names(strategy_name: &str) -> Vec { diff --git a/tools/congestion-model/src/model/queue.rs b/tools/congestion-model/src/model/queue.rs index 59bbddf6a62..636abe02108 100644 --- a/tools/congestion-model/src/model/queue.rs +++ b/tools/congestion-model/src/model/queue.rs @@ -4,12 +4,13 @@ use std::collections::VecDeque; pub struct Queue { shard: ShardId, + name: String, messages: VecDeque, } impl Queue { - pub fn new(shard: ShardId) -> Self { - Self { shard, messages: VecDeque::new() } + pub fn new(shard: ShardId, name: &str) -> Self { + Self { shard, name: name.to_string(), messages: VecDeque::new() } } pub fn size(&self) -> u64 { @@ -19,6 +20,10 @@ impl Queue { pub fn shard(&self) -> ShardId { self.shard } + + pub fn name(&self) -> &String { + &self.name + } } impl std::ops::Deref for Queue { diff --git a/tools/congestion-model/src/model/queue_bundle.rs b/tools/congestion-model/src/model/queue_bundle.rs index 6ecfe652b27..fd69d90d4b9 100644 --- a/tools/congestion-model/src/model/queue_bundle.rs +++ b/tools/congestion-model/src/model/queue_bundle.rs @@ -23,7 +23,7 @@ impl QueueBundle { }; for &shard in shards { - let mailbox = this.new_queue(shard); + let mailbox = this.new_queue(shard, "mailbox"); this.shard_mailbox.insert(shard, mailbox); this.transaction_queues.insert(shard, VecDeque::new()); } @@ -31,9 +31,9 @@ impl QueueBundle { this } - pub fn new_queue(&mut self, shard_id: ShardId) -> QueueId { + pub fn new_queue(&mut self, shard_id: ShardId, name: &str) -> QueueId { let id = self.receipt_queues.len(); - self.receipt_queues.push(Queue::new(shard_id)); + self.receipt_queues.push(Queue::new(shard_id, name)); QueueId(id) } @@ -71,7 +71,7 @@ impl QueueBundle { } impl QueueFactory for QueueBundle { - fn register_queue(&mut self, shard_id: ShardId) -> QueueId { - self.new_queue(shard_id) + fn register_queue(&mut self, shard_id: ShardId, name: &str) -> QueueId { + self.new_queue(shard_id, name) } } diff --git a/tools/congestion-model/src/strategy/mod.rs b/tools/congestion-model/src/strategy/mod.rs index 3770924f8cc..4db4c07af93 100644 --- a/tools/congestion-model/src/strategy/mod.rs +++ b/tools/congestion-model/src/strategy/mod.rs @@ -26,5 +26,5 @@ pub trait CongestionStrategy { /// Use this to create queues. pub trait QueueFactory { - fn register_queue(&mut self, to: ShardId) -> QueueId; + fn register_queue(&mut self, to: ShardId, name: &str) -> QueueId; } diff --git a/tools/congestion-model/src/strategy/simple_backpressure.rs b/tools/congestion-model/src/strategy/simple_backpressure.rs index 2e194a89e84..2059502a084 100644 --- a/tools/congestion-model/src/strategy/simple_backpressure.rs +++ b/tools/congestion-model/src/strategy/simple_backpressure.rs @@ -21,7 +21,8 @@ impl crate::CongestionStrategy for SimpleBackpressure { _other_shards: &[crate::ShardId], queue_factory: &mut dyn QueueFactory, ) { - self.delayed_outgoing_receipts = Some(queue_factory.register_queue(id)); + self.delayed_outgoing_receipts = + Some(queue_factory.register_queue(id, "delayed_outgoing_receipts")); self.id = Some(id); }