From d8653a746e3b121e2aad482fc1960ecb7ec0c1b2 Mon Sep 17 00:00:00 2001 From: wacban Date: Thu, 7 Mar 2024 12:51:26 +0000 Subject: [PATCH 1/7] feat(congestion) dump stats to csv file --- Cargo.lock | 1 + tools/congestion-model/Cargo.toml | 1 + tools/congestion-model/src/main.rs | 15 ++++++----- tools/congestion-model/src/model/mod.rs | 27 ++++++++++++++++--- .../src/strategy/global_tx_stop.rs | 5 +++- tools/congestion-model/src/strategy/mod.rs | 16 +++++++++-- .../src/strategy/no_queues.rs | 5 +++- .../src/strategy/simple_backpressure.rs | 18 ++++++++++++- 8 files changed, 74 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1102206f0fc..fe809908e3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1244,6 +1244,7 @@ name = "congestion-model" version = "0.0.0" dependencies = [ "clap", + "csv", ] [[package]] diff --git a/tools/congestion-model/Cargo.toml b/tools/congestion-model/Cargo.toml index d700f63afe7..606d64879f2 100644 --- a/tools/congestion-model/Cargo.toml +++ b/tools/congestion-model/Cargo.toml @@ -10,6 +10,7 @@ publish = false [dependencies] clap = { workspace = true, features = ["derive"] } +csv.workspace = true [lints] workspace = true diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 1d2cf8051fb..07cb98fea42 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -30,7 +30,7 @@ fn main() { summary_table::print_summary_header(); - let workload_names = parse_workflow_names(args.workload.as_ref()); + let workload_names = parse_workload_names(args.workload.as_ref()); let strategy_names = parse_strategy_names(args.strategy.as_ref()); for workload_name in &workload_names { @@ -43,7 +43,10 @@ fn main() { fn run_model(strategy_name: &str, workload_name: &str, num_shards: usize, num_rounds: usize) { let strategy = strategy(strategy_name, num_shards); let workload = workload(workload_name); - let mut model = Model::new(strategy, workload); + let stats_writer = Some(Box::new( + csv::Writer::from_path(format!("stats_{}_{}.csv", workload_name, strategy_name)).unwrap(), + )); + let mut model = Model::new(strategy, workload, stats_writer); for _ in 0..num_rounds { model.step(); } @@ -83,20 +86,20 @@ fn strategy(strategy_name: &str, num_shards: usize) -> Vec Vec { +fn parse_workload_names(workload_name: &str) -> Vec { let available: Vec = vec!["Balanced".to_string(), "All To One".to_string(), "Linear Imbalance".to_string()]; - if workflow_name == "all" { + if workload_name == "all" { return available; } for name in &available { - if normalize_cmdline_arg(name.as_ref()) == normalize_cmdline_arg(workflow_name) { + if normalize_cmdline_arg(name.as_ref()) == normalize_cmdline_arg(workload_name) { return vec![name.to_string()]; } } - panic!("The requested workflow name did not match any available workflows. Requested workflow name {:?}, The available workflows are: {:?}", workflow_name, available); + panic!("The requested workload name did not match any available workloads. Requested workload name {:?}, The available workloads are: {:?}", workload_name, available); } fn parse_strategy_names(strategy_name: &str) -> Vec { diff --git a/tools/congestion-model/src/model/mod.rs b/tools/congestion-model/src/model/mod.rs index b4d5f115b85..ee0b669bc5f 100644 --- a/tools/congestion-model/src/model/mod.rs +++ b/tools/congestion-model/src/model/mod.rs @@ -14,6 +14,7 @@ pub use transaction_registry::TransactionId; pub(crate) use transaction::Transaction; +use crate::strategy::StatsWriter; use crate::workload::Producer; use crate::{CongestionStrategy, Round}; use std::collections::BTreeMap; @@ -35,26 +36,37 @@ pub struct Model { // Workload state pub(crate) transactions: TransactionRegistry, pub(crate) producer: Box, + + stats_writer: StatsWriter, } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct ShardId(usize); +pub struct ShardId(pub usize); impl Model { pub fn new( mut shards: Vec>, mut producer: Box, + mut stats_writer: StatsWriter, ) -> Self { let num_shards = shards.len(); let shard_ids: Vec<_> = (0..num_shards).map(ShardId).collect(); let mut queues = QueueBundle::new(&shard_ids); + if let Some(stats_writer) = &mut stats_writer { + stats_writer.write_field("round").unwrap(); + } + for (shard, &id) in shards.iter_mut().zip(&shard_ids) { - shard.init(id, &shard_ids, &mut queues); + shard.init(id, &shard_ids, &mut queues, &mut stats_writer); } producer.init(&shard_ids); + if let Some(stats_writer) = &mut stats_writer { + stats_writer.write_record(None::<&[u8]>).unwrap(); + } + Self { shards, shard_ids, @@ -63,6 +75,7 @@ impl Model { producer, round: 0, queues, + stats_writer, } } @@ -70,6 +83,10 @@ impl Model { pub fn step(&mut self) { self.round += 1; + if let Some(stats_writer) = &mut self.stats_writer { + stats_writer.write_field(format!("{}", self.round)).unwrap(); + } + // Generate new transactions and place them in the per-shard transaction queues. let new_transactions = self.generate_tx_for_round(); for tx_id in new_transactions { @@ -89,13 +106,17 @@ impl Model { self.round, ShardId(i), ); - shard.compute_chunk(&mut ctx); + shard.compute_chunk(&mut ctx, &mut self.stats_writer); let (mut forwarded_receipts, shared_block_info) = ctx.finish(); outgoing.append(&mut forwarded_receipts); next_block.insert(id, shared_block_info); } + if let Some(stats_writer) = &mut self.stats_writer { + stats_writer.write_record(None::<&[u8]>).unwrap(); + } + // Propagate outputs from this round to inputs for the next round. self.block_info = next_block; for receipt in outgoing { diff --git a/tools/congestion-model/src/strategy/global_tx_stop.rs b/tools/congestion-model/src/strategy/global_tx_stop.rs index 0a69d82b113..32aeda3bc56 100644 --- a/tools/congestion-model/src/strategy/global_tx_stop.rs +++ b/tools/congestion-model/src/strategy/global_tx_stop.rs @@ -2,6 +2,8 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{GAS_LIMIT, TX_GAS_LIMIT}; +use super::StatsWriter; + /// Stop all shards from accepting new transactions when a limit of delayed /// receipts is reached in any shard. pub struct GlobalTxStopShard { @@ -18,10 +20,11 @@ impl crate::CongestionStrategy for GlobalTxStopShard { _id: crate::ShardId, _other_shards: &[crate::ShardId], _queue_factory: &mut dyn QueueFactory, + _stats_writer: &mut StatsWriter, ) { } - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, _stats_writer: &mut StatsWriter) { let mut any_shard_congested = false; for shard_info in ctx.prev_block_info().values() { if shard_info.get::().unwrap().num_delayed > self.max_delayed_receipts diff --git a/tools/congestion-model/src/strategy/mod.rs b/tools/congestion-model/src/strategy/mod.rs index 82e18961e54..f0d231eba88 100644 --- a/tools/congestion-model/src/strategy/mod.rs +++ b/tools/congestion-model/src/strategy/mod.rs @@ -9,6 +9,12 @@ mod global_tx_stop; mod no_queues; mod simple_backpressure; +// The stats writer can be used to dump stats into a CSV file. +// In the strategy implementation you should write the header in the init method +// and the individual values in the compute_chunk method. Please use the +// write_field. The model will take care of writing the record terminator. +pub type StatsWriter = Option>>; + /// Implement the shard behavior to define a new congestion control strategy. /// /// The model execution will take one `CongestionStrategy` trait object per @@ -16,10 +22,16 @@ mod simple_backpressure; /// the same code on each shard. pub trait CongestionStrategy { /// Initial state and register all necessary queues for one shard. - fn init(&mut self, id: ShardId, other_shards: &[ShardId], queue_factory: &mut dyn QueueFactory); + fn init( + &mut self, + id: ShardId, + other_shards: &[ShardId], + queue_factory: &mut dyn QueueFactory, + stats_writer: &mut StatsWriter, + ); /// Decide which receipts to execute, which to delay, and which to forward. - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext); + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, stats_writer: &mut StatsWriter); } /// Use this to create queues. diff --git a/tools/congestion-model/src/strategy/no_queues.rs b/tools/congestion-model/src/strategy/no_queues.rs index e4fda644d5b..7d1a691bf5e 100644 --- a/tools/congestion-model/src/strategy/no_queues.rs +++ b/tools/congestion-model/src/strategy/no_queues.rs @@ -2,6 +2,8 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{GAS_LIMIT, TX_GAS_LIMIT}; +use super::StatsWriter; + pub struct NoQueueShard {} impl crate::CongestionStrategy for NoQueueShard { @@ -10,10 +12,11 @@ impl crate::CongestionStrategy for NoQueueShard { _id: crate::ShardId, _other_shards: &[crate::ShardId], _queue_factory: &mut dyn QueueFactory, + _stats_writer: &mut StatsWriter, ) { } - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, _stats_writer: &mut StatsWriter) { while ctx.gas_burnt() < TX_GAS_LIMIT { if let Some(tx) = ctx.incoming_transactions().pop_front() { let outgoing = ctx.accept_transaction(tx); diff --git a/tools/congestion-model/src/strategy/simple_backpressure.rs b/tools/congestion-model/src/strategy/simple_backpressure.rs index 2e194a89e84..5bd717ebd0e 100644 --- a/tools/congestion-model/src/strategy/simple_backpressure.rs +++ b/tools/congestion-model/src/strategy/simple_backpressure.rs @@ -2,6 +2,8 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{QueueId, Receipt, ShardId, GAS_LIMIT, TX_GAS_LIMIT}; +use super::StatsWriter; + /// Have a fixed max queue size per shard and apply backpressure by stop /// forwarding receipts when a receiving shard has reached its limit. pub struct SimpleBackpressure { @@ -20,12 +22,26 @@ impl crate::CongestionStrategy for SimpleBackpressure { id: crate::ShardId, _other_shards: &[crate::ShardId], queue_factory: &mut dyn QueueFactory, + stats_writer: &mut StatsWriter, ) { self.delayed_outgoing_receipts = Some(queue_factory.register_queue(id)); self.id = Some(id); + + if let Some(stats_writer) = stats_writer { + stats_writer.write_field(format!("shard_{}_incoming_queue", id.0)).unwrap(); + stats_writer.write_field(format!("shard_{}_outgoing_queue", id.0)).unwrap(); + } } - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, stats_writer: &mut StatsWriter) { + if let Some(stats_writer) = stats_writer { + let incoming_queue_len = ctx.incoming_receipts().len(); + let outgoing_queue_len = ctx.queue(self.delayed_outgoing_receipts.unwrap()).len(); + + stats_writer.write_field(format!("{}", incoming_queue_len)).unwrap(); + stats_writer.write_field(format!("{}", outgoing_queue_len)).unwrap(); + } + // first attempt forwarding previously buffered outgoing receipts let buffered: Vec<_> = ctx.queue(self.delayed_outgoing_receipts.unwrap()).drain(..).collect(); From 199c68412c0e9e2c73235255fda49cb816456ef5 Mon Sep 17 00:00:00 2001 From: wacban Date: Thu, 7 Mar 2024 13:03:11 +0000 Subject: [PATCH 2/7] make it optional --- tools/congestion-model/src/main.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 07cb98fea42..3f7028083d6 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -23,6 +23,9 @@ struct Args { /// Can be used to select a single strategy or "all" to run all strategies. #[clap(long, default_value = "all")] strategy: String, + + #[clap(long, default_value = "false")] + write_stats: bool, } fn main() { @@ -35,17 +38,24 @@ fn main() { for workload_name in &workload_names { for strategy_name in &strategy_names { - run_model(&strategy_name, &workload_name, args.shards, args.rounds); + run_model(&strategy_name, &workload_name, args.shards, args.rounds, args.write_stats); } } } -fn run_model(strategy_name: &str, workload_name: &str, num_shards: usize, num_rounds: usize) { +fn run_model( + strategy_name: &str, + workload_name: &str, + num_shards: usize, + num_rounds: usize, + write_stats: bool, +) { let strategy = strategy(strategy_name, num_shards); let workload = workload(workload_name); - let stats_writer = Some(Box::new( - csv::Writer::from_path(format!("stats_{}_{}.csv", workload_name, strategy_name)).unwrap(), - )); + let stats_writer = write_stats.then(|| { + let path = format!("stats_{}_{}.csv", workload_name, strategy_name); + Box::new(csv::Writer::from_path(path).unwrap()) + }); let mut model = Model::new(strategy, workload, stats_writer); for _ in 0..num_rounds { model.step(); From 3cce98b13d9f08739114a028ed7502f07258c97c Mon Sep 17 00:00:00 2001 From: wacban Date: Thu, 7 Mar 2024 15:22:29 +0100 Subject: [PATCH 3/7] added time column --- Cargo.lock | 1 + tools/congestion-model/Cargo.toml | 1 + tools/congestion-model/src/model/mod.rs | 13 +++++++++++++ 3 files changed, 15 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index fe809908e3e..279b1ac2716 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,6 +1243,7 @@ dependencies = [ name = "congestion-model" version = "0.0.0" dependencies = [ + "chrono", "clap", "csv", ] diff --git a/tools/congestion-model/Cargo.toml b/tools/congestion-model/Cargo.toml index 606d64879f2..d0c035b4cdf 100644 --- a/tools/congestion-model/Cargo.toml +++ b/tools/congestion-model/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true publish = false [dependencies] +chrono.workspace = true clap = { workspace = true, features = ["derive"] } csv.workspace = true diff --git a/tools/congestion-model/src/model/mod.rs b/tools/congestion-model/src/model/mod.rs index ee0b669bc5f..d7b0d7de333 100644 --- a/tools/congestion-model/src/model/mod.rs +++ b/tools/congestion-model/src/model/mod.rs @@ -6,6 +6,7 @@ mod transaction; mod transaction_registry; pub use block_info::BlockInfo; +use chrono::{DateTime, Utc}; pub use chunk_execution::*; pub use queue::*; pub use queue_bundle::*; @@ -18,6 +19,7 @@ use crate::strategy::StatsWriter; use crate::workload::Producer; use crate::{CongestionStrategy, Round}; use std::collections::BTreeMap; +use std::time::Duration; use transaction_registry::TransactionRegistry; pub struct Model { @@ -38,6 +40,7 @@ pub struct Model { pub(crate) producer: Box, stats_writer: StatsWriter, + start_time: DateTime, } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -49,11 +52,17 @@ impl Model { mut producer: Box, mut stats_writer: StatsWriter, ) -> Self { + // Set the start time to an hour ago to make it visible by default in + // grafana. Each round is 1 virtual second so 1 hour is good for looking + // at a maximum of 3600 rounds, beyond that you'll need to customize the + // grafana time range. + let start_time = Utc::now() - Duration::from_secs(1 * 60 * 60); let num_shards = shards.len(); let shard_ids: Vec<_> = (0..num_shards).map(ShardId).collect(); let mut queues = QueueBundle::new(&shard_ids); if let Some(stats_writer) = &mut stats_writer { + stats_writer.write_field("time").unwrap(); stats_writer.write_field("round").unwrap(); } @@ -76,6 +85,7 @@ impl Model { round: 0, queues, stats_writer, + start_time, } } @@ -84,6 +94,9 @@ impl Model { self.round += 1; if let Some(stats_writer) = &mut self.stats_writer { + let time = self.start_time + Duration::from_secs(self.round); + + stats_writer.write_field(format!("{:?}", time)).unwrap(); stats_writer.write_field(format!("{}", self.round)).unwrap(); } From 30ccf146e2f829a512710cfd248deb38805a00e0 Mon Sep 17 00:00:00 2001 From: wacban Date: Sun, 10 Mar 2024 18:32:46 +0100 Subject: [PATCH 4/7] new implementation --- tools/congestion-model/src/evaluation/mod.rs | 56 +++++++++++++++ tools/congestion-model/src/lib.rs | 2 +- tools/congestion-model/src/main.rs | 68 ++++++++++++++++--- tools/congestion-model/src/model/mod.rs | 38 +---------- tools/congestion-model/src/model/queue.rs | 9 ++- .../src/model/queue_bundle.rs | 10 +-- .../src/strategy/global_tx_stop.rs | 5 +- tools/congestion-model/src/strategy/mod.rs | 18 +---- .../src/strategy/no_queues.rs | 5 +- .../src/strategy/simple_backpressure.rs | 26 +++---- 10 files changed, 145 insertions(+), 92 deletions(-) diff --git a/tools/congestion-model/src/evaluation/mod.rs b/tools/congestion-model/src/evaluation/mod.rs index bf3d2227145..b99ff57b769 100644 --- a/tools/congestion-model/src/evaluation/mod.rs +++ b/tools/congestion-model/src/evaluation/mod.rs @@ -1,3 +1,4 @@ +use chrono::{Duration, Utc}; pub use transaction_progress::TransactionStatus; use crate::{GGas, Model, ShardId}; @@ -26,6 +27,12 @@ pub struct Progress { pub failed_transactions: usize, } +// The stats writer can be used to dump stats into a CSV file. +// In the strategy implementation you should write the header in the init method +// and the individual values in the compute_chunk method. Please use the +// write_field. The model will take care of writing the record terminator. +pub type StatsWriter = Option>>; + impl Model { pub fn queue_lengths(&self) -> HashMap { let mut out = HashMap::new(); @@ -72,4 +79,53 @@ impl Model { failed_transactions, } } + + pub fn write_stats_header(&self, stats_writer: &mut StatsWriter) { + let Some(stats_writer) = stats_writer else { return }; + + stats_writer.write_field("time").unwrap(); + stats_writer.write_field("round").unwrap(); + + stats_writer.write_field("finished_transactions").unwrap(); + stats_writer.write_field("pending_transactions").unwrap(); + stats_writer.write_field("waiting_transactions").unwrap(); + stats_writer.write_field("failed_transactions").unwrap(); + + for shard_id in self.shard_ids.clone() { + for queue in self.queues.shard_queues(shard_id) { + let field_name = format!("shard_{}_queue_{}", shard_id, queue.name()); + stats_writer.write_field(field_name).unwrap(); + } + } + stats_writer.write_record(None::<&[u8]>).unwrap(); + } + + pub fn write_stats_values( + &self, + stats_writer: &mut StatsWriter, + start_time: chrono::prelude::DateTime, + round: usize, + ) { + let Some(stats_writer) = stats_writer else { return }; + + let time = start_time + Duration::seconds(round as i64); + + stats_writer.write_field(format!("{:?}", time)).unwrap(); + stats_writer.write_field(format!("{}", round)).unwrap(); + + // this may be slow + let progress = self.progress(); + stats_writer.write_field(format!("{}", progress.finished_transactions)).unwrap(); + stats_writer.write_field(format!("{}", progress.pending_transactions)).unwrap(); + stats_writer.write_field(format!("{}", progress.waiting_transactions)).unwrap(); + stats_writer.write_field(format!("{}", progress.failed_transactions)).unwrap(); + + for shard_id in self.shard_ids.clone() { + for queue in self.queues.shard_queues(shard_id) { + stats_writer.write_field(format!("{}", queue.len())).unwrap(); + } + } + + stats_writer.write_record(None::<&[u8]>).unwrap(); + } } diff --git a/tools/congestion-model/src/lib.rs b/tools/congestion-model/src/lib.rs index ae3b1597863..d293a060a1d 100644 --- a/tools/congestion-model/src/lib.rs +++ b/tools/congestion-model/src/lib.rs @@ -3,7 +3,7 @@ mod model; pub mod strategy; pub mod workload; -pub use evaluation::{summary_table, TransactionStatus}; +pub use evaluation::{summary_table, StatsWriter, TransactionStatus}; pub use model::{Model, Queue, QueueId, Receipt, ShardId, TransactionId}; pub use strategy::CongestionStrategy; pub use workload::{ReceiptDefinition, ReceiptId, TransactionBuilder}; diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 3f7028083d6..1257da2d137 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -1,11 +1,16 @@ +use std::time::Duration; + +use chrono::Utc; use congestion_model::strategy::{GlobalTxStopShard, NoQueueShard, SimpleBackpressure}; use congestion_model::workload::{ AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer, }; -use congestion_model::{summary_table, CongestionStrategy, Model, PGAS}; +use congestion_model::{summary_table, CongestionStrategy, Model, StatsWriter, PGAS}; use clap::Parser; +// pub type StatsWriter = Option>>; + #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -24,8 +29,18 @@ struct Args { #[clap(long, default_value = "all")] strategy: String, + /// If enabled the model will write stats into a csv file that can be used + /// to visualize the evaluation of the model over time. #[clap(long, default_value = "false")] write_stats: bool, + + /// Optional path the the file where the stats should be saved. By default + /// the stats will be saved to a file name with the strategy and workload + /// name concatenated with "stats.csv". This option can only be used when a + /// single strategy and workflow are selected otherwise the stats from + /// different evaluations would overwrite each other. + #[clap(long)] + write_stats_filepath: Option, } fn main() { @@ -36,28 +51,63 @@ fn main() { let workload_names = parse_workload_names(args.workload.as_ref()); let strategy_names = parse_strategy_names(args.strategy.as_ref()); + if args.write_stats_filepath.is_some() + && (workload_names.len() != 1 || strategy_names.len() != 1) + { + panic!("write_stats_filepath can only be used with single workload and strategy. Parsed {:?} workloads and {:?} strategies. ", workload_names, strategy_names); + } + for workload_name in &workload_names { for strategy_name in &strategy_names { - run_model(&strategy_name, &workload_name, args.shards, args.rounds, args.write_stats); + let stats_writer = parse_stats_writer( + args.write_stats, + args.write_stats_filepath.clone(), + workload_name, + strategy_name, + ); + + run_model(&strategy_name, &workload_name, args.shards, args.rounds, stats_writer); } } } +fn parse_stats_writer( + write_stats: bool, + write_stats_filepath: Option, + workload_name: &String, + strategy_name: &String, +) -> StatsWriter { + if !write_stats { + return None; + } + + let default_path = format!("stats_{}_{}.csv", workload_name, strategy_name); + let path = write_stats_filepath.unwrap_or(default_path); + let stats_writer = Box::new(csv::Writer::from_path(path).unwrap()); + Some(stats_writer) +} + fn run_model( strategy_name: &str, workload_name: &str, num_shards: usize, num_rounds: usize, - write_stats: bool, + mut stats_writer: StatsWriter, ) { let strategy = strategy(strategy_name, num_shards); let workload = workload(workload_name); - let stats_writer = write_stats.then(|| { - let path = format!("stats_{}_{}.csv", workload_name, strategy_name); - Box::new(csv::Writer::from_path(path).unwrap()) - }); - let mut model = Model::new(strategy, workload, stats_writer); - for _ in 0..num_rounds { + let mut model = Model::new(strategy, workload); + + // Set the start time to an half hour ago to make it visible by default in + // grafana. Each round is 1 virtual second so hald an hour is good for + // looking at a maximum of 1800 rounds, beyond that you'll need to customize + // the grafana time range. + let start_time = Utc::now() - Duration::from_secs(1 * 60 * 60); + + model.write_stats_header(&mut stats_writer); + + for round in 0..num_rounds { + model.write_stats_values(&mut stats_writer, start_time, round); model.step(); } summary_table::print_summary_row(&model, workload_name, strategy_name); diff --git a/tools/congestion-model/src/model/mod.rs b/tools/congestion-model/src/model/mod.rs index d7b0d7de333..676509ef68f 100644 --- a/tools/congestion-model/src/model/mod.rs +++ b/tools/congestion-model/src/model/mod.rs @@ -6,7 +6,6 @@ mod transaction; mod transaction_registry; pub use block_info::BlockInfo; -use chrono::{DateTime, Utc}; pub use chunk_execution::*; pub use queue::*; pub use queue_bundle::*; @@ -15,11 +14,9 @@ pub use transaction_registry::TransactionId; pub(crate) use transaction::Transaction; -use crate::strategy::StatsWriter; use crate::workload::Producer; use crate::{CongestionStrategy, Round}; use std::collections::BTreeMap; -use std::time::Duration; use transaction_registry::TransactionRegistry; pub struct Model { @@ -38,9 +35,6 @@ pub struct Model { // Workload state pub(crate) transactions: TransactionRegistry, pub(crate) producer: Box, - - stats_writer: StatsWriter, - start_time: DateTime, } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -50,32 +44,17 @@ impl Model { pub fn new( mut shards: Vec>, mut producer: Box, - mut stats_writer: StatsWriter, ) -> Self { - // Set the start time to an hour ago to make it visible by default in - // grafana. Each round is 1 virtual second so 1 hour is good for looking - // at a maximum of 3600 rounds, beyond that you'll need to customize the - // grafana time range. - let start_time = Utc::now() - Duration::from_secs(1 * 60 * 60); let num_shards = shards.len(); let shard_ids: Vec<_> = (0..num_shards).map(ShardId).collect(); let mut queues = QueueBundle::new(&shard_ids); - if let Some(stats_writer) = &mut stats_writer { - stats_writer.write_field("time").unwrap(); - stats_writer.write_field("round").unwrap(); - } - for (shard, &id) in shards.iter_mut().zip(&shard_ids) { - shard.init(id, &shard_ids, &mut queues, &mut stats_writer); + shard.init(id, &shard_ids, &mut queues); } producer.init(&shard_ids); - if let Some(stats_writer) = &mut stats_writer { - stats_writer.write_record(None::<&[u8]>).unwrap(); - } - Self { shards, shard_ids, @@ -84,8 +63,6 @@ impl Model { producer, round: 0, queues, - stats_writer, - start_time, } } @@ -93,13 +70,6 @@ impl Model { pub fn step(&mut self) { self.round += 1; - if let Some(stats_writer) = &mut self.stats_writer { - let time = self.start_time + Duration::from_secs(self.round); - - stats_writer.write_field(format!("{:?}", time)).unwrap(); - stats_writer.write_field(format!("{}", self.round)).unwrap(); - } - // Generate new transactions and place them in the per-shard transaction queues. let new_transactions = self.generate_tx_for_round(); for tx_id in new_transactions { @@ -119,17 +89,13 @@ impl Model { self.round, ShardId(i), ); - shard.compute_chunk(&mut ctx, &mut self.stats_writer); + shard.compute_chunk(&mut ctx); let (mut forwarded_receipts, shared_block_info) = ctx.finish(); outgoing.append(&mut forwarded_receipts); next_block.insert(id, shared_block_info); } - if let Some(stats_writer) = &mut self.stats_writer { - stats_writer.write_record(None::<&[u8]>).unwrap(); - } - // Propagate outputs from this round to inputs for the next round. self.block_info = next_block; for receipt in outgoing { diff --git a/tools/congestion-model/src/model/queue.rs b/tools/congestion-model/src/model/queue.rs index 59bbddf6a62..636abe02108 100644 --- a/tools/congestion-model/src/model/queue.rs +++ b/tools/congestion-model/src/model/queue.rs @@ -4,12 +4,13 @@ use std::collections::VecDeque; pub struct Queue { shard: ShardId, + name: String, messages: VecDeque, } impl Queue { - pub fn new(shard: ShardId) -> Self { - Self { shard, messages: VecDeque::new() } + pub fn new(shard: ShardId, name: &str) -> Self { + Self { shard, name: name.to_string(), messages: VecDeque::new() } } pub fn size(&self) -> u64 { @@ -19,6 +20,10 @@ impl Queue { pub fn shard(&self) -> ShardId { self.shard } + + pub fn name(&self) -> &String { + &self.name + } } impl std::ops::Deref for Queue { diff --git a/tools/congestion-model/src/model/queue_bundle.rs b/tools/congestion-model/src/model/queue_bundle.rs index 6ecfe652b27..fd69d90d4b9 100644 --- a/tools/congestion-model/src/model/queue_bundle.rs +++ b/tools/congestion-model/src/model/queue_bundle.rs @@ -23,7 +23,7 @@ impl QueueBundle { }; for &shard in shards { - let mailbox = this.new_queue(shard); + let mailbox = this.new_queue(shard, "mailbox"); this.shard_mailbox.insert(shard, mailbox); this.transaction_queues.insert(shard, VecDeque::new()); } @@ -31,9 +31,9 @@ impl QueueBundle { this } - pub fn new_queue(&mut self, shard_id: ShardId) -> QueueId { + pub fn new_queue(&mut self, shard_id: ShardId, name: &str) -> QueueId { let id = self.receipt_queues.len(); - self.receipt_queues.push(Queue::new(shard_id)); + self.receipt_queues.push(Queue::new(shard_id, name)); QueueId(id) } @@ -71,7 +71,7 @@ impl QueueBundle { } impl QueueFactory for QueueBundle { - fn register_queue(&mut self, shard_id: ShardId) -> QueueId { - self.new_queue(shard_id) + fn register_queue(&mut self, shard_id: ShardId, name: &str) -> QueueId { + self.new_queue(shard_id, name) } } diff --git a/tools/congestion-model/src/strategy/global_tx_stop.rs b/tools/congestion-model/src/strategy/global_tx_stop.rs index 32aeda3bc56..0a69d82b113 100644 --- a/tools/congestion-model/src/strategy/global_tx_stop.rs +++ b/tools/congestion-model/src/strategy/global_tx_stop.rs @@ -2,8 +2,6 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{GAS_LIMIT, TX_GAS_LIMIT}; -use super::StatsWriter; - /// Stop all shards from accepting new transactions when a limit of delayed /// receipts is reached in any shard. pub struct GlobalTxStopShard { @@ -20,11 +18,10 @@ impl crate::CongestionStrategy for GlobalTxStopShard { _id: crate::ShardId, _other_shards: &[crate::ShardId], _queue_factory: &mut dyn QueueFactory, - _stats_writer: &mut StatsWriter, ) { } - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, _stats_writer: &mut StatsWriter) { + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { let mut any_shard_congested = false; for shard_info in ctx.prev_block_info().values() { if shard_info.get::().unwrap().num_delayed > self.max_delayed_receipts diff --git a/tools/congestion-model/src/strategy/mod.rs b/tools/congestion-model/src/strategy/mod.rs index f0d231eba88..e8181e5ef3a 100644 --- a/tools/congestion-model/src/strategy/mod.rs +++ b/tools/congestion-model/src/strategy/mod.rs @@ -9,12 +9,6 @@ mod global_tx_stop; mod no_queues; mod simple_backpressure; -// The stats writer can be used to dump stats into a CSV file. -// In the strategy implementation you should write the header in the init method -// and the individual values in the compute_chunk method. Please use the -// write_field. The model will take care of writing the record terminator. -pub type StatsWriter = Option>>; - /// Implement the shard behavior to define a new congestion control strategy. /// /// The model execution will take one `CongestionStrategy` trait object per @@ -22,19 +16,13 @@ pub type StatsWriter = Option>>; /// the same code on each shard. pub trait CongestionStrategy { /// Initial state and register all necessary queues for one shard. - fn init( - &mut self, - id: ShardId, - other_shards: &[ShardId], - queue_factory: &mut dyn QueueFactory, - stats_writer: &mut StatsWriter, - ); + fn init(&mut self, id: ShardId, other_shards: &[ShardId], queue_factory: &mut dyn QueueFactory); /// Decide which receipts to execute, which to delay, and which to forward. - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, stats_writer: &mut StatsWriter); + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext); } /// Use this to create queues. pub trait QueueFactory { - fn register_queue(&mut self, to: ShardId) -> QueueId; + fn register_queue(&mut self, to: ShardId, name: &str) -> QueueId; } diff --git a/tools/congestion-model/src/strategy/no_queues.rs b/tools/congestion-model/src/strategy/no_queues.rs index 7d1a691bf5e..e4fda644d5b 100644 --- a/tools/congestion-model/src/strategy/no_queues.rs +++ b/tools/congestion-model/src/strategy/no_queues.rs @@ -2,8 +2,6 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{GAS_LIMIT, TX_GAS_LIMIT}; -use super::StatsWriter; - pub struct NoQueueShard {} impl crate::CongestionStrategy for NoQueueShard { @@ -12,11 +10,10 @@ impl crate::CongestionStrategy for NoQueueShard { _id: crate::ShardId, _other_shards: &[crate::ShardId], _queue_factory: &mut dyn QueueFactory, - _stats_writer: &mut StatsWriter, ) { } - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, _stats_writer: &mut StatsWriter) { + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { while ctx.gas_burnt() < TX_GAS_LIMIT { if let Some(tx) = ctx.incoming_transactions().pop_front() { let outgoing = ctx.accept_transaction(tx); diff --git a/tools/congestion-model/src/strategy/simple_backpressure.rs b/tools/congestion-model/src/strategy/simple_backpressure.rs index 5bd717ebd0e..c76aba50f99 100644 --- a/tools/congestion-model/src/strategy/simple_backpressure.rs +++ b/tools/congestion-model/src/strategy/simple_backpressure.rs @@ -2,8 +2,6 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{QueueId, Receipt, ShardId, GAS_LIMIT, TX_GAS_LIMIT}; -use super::StatsWriter; - /// Have a fixed max queue size per shard and apply backpressure by stop /// forwarding receipts when a receiving shard has reached its limit. pub struct SimpleBackpressure { @@ -22,25 +20,21 @@ impl crate::CongestionStrategy for SimpleBackpressure { id: crate::ShardId, _other_shards: &[crate::ShardId], queue_factory: &mut dyn QueueFactory, - stats_writer: &mut StatsWriter, ) { - self.delayed_outgoing_receipts = Some(queue_factory.register_queue(id)); + self.delayed_outgoing_receipts = + Some(queue_factory.register_queue(id, "delayed_outgoing_receipts")); self.id = Some(id); - - if let Some(stats_writer) = stats_writer { - stats_writer.write_field(format!("shard_{}_incoming_queue", id.0)).unwrap(); - stats_writer.write_field(format!("shard_{}_outgoing_queue", id.0)).unwrap(); - } } - fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext, stats_writer: &mut StatsWriter) { - if let Some(stats_writer) = stats_writer { - let incoming_queue_len = ctx.incoming_receipts().len(); - let outgoing_queue_len = ctx.queue(self.delayed_outgoing_receipts.unwrap()).len(); + fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { + // TODO(wacban) + // if let Some(stats_writer) = stats_writer { + // let incoming_queue_len = ctx.incoming_receipts().len(); + // let outgoing_queue_len = ctx.queue(self.delayed_outgoing_receipts.unwrap()).len(); - stats_writer.write_field(format!("{}", incoming_queue_len)).unwrap(); - stats_writer.write_field(format!("{}", outgoing_queue_len)).unwrap(); - } + // stats_writer.write_field(format!("{}", incoming_queue_len)).unwrap(); + // stats_writer.write_field(format!("{}", outgoing_queue_len)).unwrap(); + // } // first attempt forwarding previously buffered outgoing receipts let buffered: Vec<_> = From 043cdca98f121a5fda9fcb1f80615e35f8644ad8 Mon Sep 17 00:00:00 2001 From: wacban Date: Sun, 10 Mar 2024 18:43:50 +0100 Subject: [PATCH 5/7] self review --- tools/congestion-model/src/evaluation/mod.rs | 5 +---- tools/congestion-model/src/main.rs | 11 +++++------ .../src/strategy/simple_backpressure.rs | 9 --------- 3 files changed, 6 insertions(+), 19 deletions(-) diff --git a/tools/congestion-model/src/evaluation/mod.rs b/tools/congestion-model/src/evaluation/mod.rs index b99ff57b769..62467a3c8fe 100644 --- a/tools/congestion-model/src/evaluation/mod.rs +++ b/tools/congestion-model/src/evaluation/mod.rs @@ -28,9 +28,6 @@ pub struct Progress { } // The stats writer can be used to dump stats into a CSV file. -// In the strategy implementation you should write the header in the init method -// and the individual values in the compute_chunk method. Please use the -// write_field. The model will take care of writing the record terminator. pub type StatsWriter = Option>>; impl Model { @@ -113,7 +110,7 @@ impl Model { stats_writer.write_field(format!("{:?}", time)).unwrap(); stats_writer.write_field(format!("{}", round)).unwrap(); - // this may be slow + // This is slow and takes up to 10s for the slowest workloads and strategies. let progress = self.progress(); stats_writer.write_field(format!("{}", progress.finished_transactions)).unwrap(); stats_writer.write_field(format!("{}", progress.pending_transactions)).unwrap(); diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 1257da2d137..61c6fe25696 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -9,8 +9,6 @@ use congestion_model::{summary_table, CongestionStrategy, Model, StatsWriter, PG use clap::Parser; -// pub type StatsWriter = Option>>; - #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -35,10 +33,11 @@ struct Args { write_stats: bool, /// Optional path the the file where the stats should be saved. By default - /// the stats will be saved to a file name with the strategy and workload - /// name concatenated with "stats.csv". This option can only be used when a - /// single strategy and workflow are selected otherwise the stats from - /// different evaluations would overwrite each other. + /// the stats will be saved to a file name with prefix "stats", the strategy + /// and workload name concatenated and ".csv" extension. This option can + /// only be used when a single strategy and a single workflow are selected + /// otherwise the stats from different evaluations would overwrite each + /// other. #[clap(long)] write_stats_filepath: Option, } diff --git a/tools/congestion-model/src/strategy/simple_backpressure.rs b/tools/congestion-model/src/strategy/simple_backpressure.rs index c76aba50f99..2059502a084 100644 --- a/tools/congestion-model/src/strategy/simple_backpressure.rs +++ b/tools/congestion-model/src/strategy/simple_backpressure.rs @@ -27,15 +27,6 @@ impl crate::CongestionStrategy for SimpleBackpressure { } fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) { - // TODO(wacban) - // if let Some(stats_writer) = stats_writer { - // let incoming_queue_len = ctx.incoming_receipts().len(); - // let outgoing_queue_len = ctx.queue(self.delayed_outgoing_receipts.unwrap()).len(); - - // stats_writer.write_field(format!("{}", incoming_queue_len)).unwrap(); - // stats_writer.write_field(format!("{}", outgoing_queue_len)).unwrap(); - // } - // first attempt forwarding previously buffered outgoing receipts let buffered: Vec<_> = ctx.queue(self.delayed_outgoing_receipts.unwrap()).drain(..).collect(); From 300f8b6df04db4e8afff853449966a8eb4e254ed Mon Sep 17 00:00:00 2001 From: wacban Date: Mon, 11 Mar 2024 09:36:21 +0100 Subject: [PATCH 6/7] un-pub shard id --- tools/congestion-model/src/model/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/congestion-model/src/model/mod.rs b/tools/congestion-model/src/model/mod.rs index 676509ef68f..b4d5f115b85 100644 --- a/tools/congestion-model/src/model/mod.rs +++ b/tools/congestion-model/src/model/mod.rs @@ -38,7 +38,7 @@ pub struct Model { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct ShardId(pub usize); +pub struct ShardId(usize); impl Model { pub fn new( From 55b9873355105dab3be8fb13fc6d414449b664b6 Mon Sep 17 00:00:00 2001 From: wacban Date: Mon, 11 Mar 2024 10:08:20 +0100 Subject: [PATCH 7/7] cargo fmt --- tools/congestion-model/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 07feaabf877..52b37d28836 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -1,5 +1,5 @@ -use std::time::Duration; use chrono::Utc; +use std::time::Duration; use congestion_model::strategy::{GlobalTxStopShard, NewTxLast, NoQueueShard, SimpleBackpressure}; use congestion_model::workload::{