Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(congestion): write stats to a csv file #10719

Merged
merged 8 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions tools/congestion-model/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ license.workspace = true
publish = false

[dependencies]
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
csv.workspace = true

[lints]
workspace = true
53 changes: 53 additions & 0 deletions tools/congestion-model/src/evaluation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{Duration, Utc};
pub use transaction_progress::TransactionStatus;

use crate::{GGas, Model, ShardId};
Expand Down Expand Up @@ -26,6 +27,9 @@ pub struct Progress {
pub failed_transactions: usize,
}

// The stats writer can be used to dump stats into a CSV file.
pub type StatsWriter = Option<Box<csv::Writer<std::fs::File>>>;

impl Model {
pub fn queue_lengths(&self) -> HashMap<ShardId, ShardQueueLengths> {
let mut out = HashMap::new();
Expand Down Expand Up @@ -72,4 +76,53 @@ impl Model {
failed_transactions,
}
}

pub fn write_stats_header(&self, stats_writer: &mut StatsWriter) {
let Some(stats_writer) = stats_writer else { return };

stats_writer.write_field("time").unwrap();
stats_writer.write_field("round").unwrap();

stats_writer.write_field("finished_transactions").unwrap();
stats_writer.write_field("pending_transactions").unwrap();
stats_writer.write_field("waiting_transactions").unwrap();
stats_writer.write_field("failed_transactions").unwrap();

for shard_id in self.shard_ids.clone() {
for queue in self.queues.shard_queues(shard_id) {
let field_name = format!("shard_{}_queue_{}", shard_id, queue.name());
stats_writer.write_field(field_name).unwrap();
}
}
stats_writer.write_record(None::<&[u8]>).unwrap();
}

pub fn write_stats_values(
&self,
stats_writer: &mut StatsWriter,
start_time: chrono::prelude::DateTime<Utc>,
round: usize,
) {
let Some(stats_writer) = stats_writer else { return };

let time = start_time + Duration::seconds(round as i64);

stats_writer.write_field(format!("{:?}", time)).unwrap();
stats_writer.write_field(format!("{}", round)).unwrap();

// This is slow and takes up to 10s for the slowest workloads and strategies.
let progress = self.progress();
stats_writer.write_field(format!("{}", progress.finished_transactions)).unwrap();
stats_writer.write_field(format!("{}", progress.pending_transactions)).unwrap();
stats_writer.write_field(format!("{}", progress.waiting_transactions)).unwrap();
stats_writer.write_field(format!("{}", progress.failed_transactions)).unwrap();

for shard_id in self.shard_ids.clone() {
for queue in self.queues.shard_queues(shard_id) {
stats_writer.write_field(format!("{}", queue.len())).unwrap();
}
}

stats_writer.write_record(None::<&[u8]>).unwrap();
}
}
2 changes: 1 addition & 1 deletion tools/congestion-model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod model;
pub mod strategy;
pub mod workload;

pub use evaluation::{summary_table, TransactionStatus};
pub use evaluation::{summary_table, StatsWriter, TransactionStatus};
pub use model::{Model, Queue, QueueId, Receipt, ShardId, TransactionId};
pub use strategy::CongestionStrategy;
pub use workload::{ReceiptDefinition, ReceiptId, TransactionBuilder};
Expand Down
80 changes: 71 additions & 9 deletions tools/congestion-model/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::Duration;

use chrono::Utc;
use congestion_model::strategy::{GlobalTxStopShard, NoQueueShard, SimpleBackpressure};
use congestion_model::workload::{
AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer,
};
use congestion_model::{summary_table, CongestionStrategy, Model, PGAS};
use congestion_model::{summary_table, CongestionStrategy, Model, StatsWriter, PGAS};

use clap::Parser;

Expand All @@ -23,28 +26,87 @@ struct Args {
/// Can be used to select a single strategy or "all" to run all strategies.
#[clap(long, default_value = "all")]
strategy: String,

/// If enabled the model will write stats into a csv file that can be used
/// to visualize the evaluation of the model over time.
#[clap(long, default_value = "false")]
write_stats: bool,

/// Optional path the the file where the stats should be saved. By default
/// the stats will be saved to a file name with prefix "stats", the strategy
/// and workload name concatenated and ".csv" extension. This option can
/// only be used when a single strategy and a single workflow are selected
/// otherwise the stats from different evaluations would overwrite each
/// other.
#[clap(long)]
write_stats_filepath: Option<String>,
}

fn main() {
let args = Args::parse();

summary_table::print_summary_header();

let workload_names = parse_workflow_names(args.workload.as_ref());
let workload_names = parse_workload_names(args.workload.as_ref());
let strategy_names = parse_strategy_names(args.strategy.as_ref());

if args.write_stats_filepath.is_some()
&& (workload_names.len() != 1 || strategy_names.len() != 1)
{
panic!("write_stats_filepath can only be used with single workload and strategy. Parsed {:?} workloads and {:?} strategies. ", workload_names, strategy_names);
}

for workload_name in &workload_names {
for strategy_name in &strategy_names {
run_model(&strategy_name, &workload_name, args.shards, args.rounds);
let stats_writer = parse_stats_writer(
args.write_stats,
args.write_stats_filepath.clone(),
workload_name,
strategy_name,
);

run_model(&strategy_name, &workload_name, args.shards, args.rounds, stats_writer);
}
}
}

fn run_model(strategy_name: &str, workload_name: &str, num_shards: usize, num_rounds: usize) {
fn parse_stats_writer(
write_stats: bool,
write_stats_filepath: Option<String>,
workload_name: &String,
strategy_name: &String,
) -> StatsWriter {
if !write_stats {
return None;
}

let default_path = format!("stats_{}_{}.csv", workload_name, strategy_name);
let path = write_stats_filepath.unwrap_or(default_path);
let stats_writer = Box::new(csv::Writer::from_path(path).unwrap());
Some(stats_writer)
}

fn run_model(
strategy_name: &str,
workload_name: &str,
num_shards: usize,
num_rounds: usize,
mut stats_writer: StatsWriter,
) {
let strategy = strategy(strategy_name, num_shards);
let workload = workload(workload_name);
let mut model = Model::new(strategy, workload);
for _ in 0..num_rounds {

// Set the start time to an half hour ago to make it visible by default in
// grafana. Each round is 1 virtual second so hald an hour is good for
// looking at a maximum of 1800 rounds, beyond that you'll need to customize
// the grafana time range.
let start_time = Utc::now() - Duration::from_secs(1 * 60 * 60);

model.write_stats_header(&mut stats_writer);

for round in 0..num_rounds {
model.write_stats_values(&mut stats_writer, start_time, round);
model.step();
}
summary_table::print_summary_row(&model, workload_name, strategy_name);
Expand Down Expand Up @@ -83,20 +145,20 @@ fn strategy(strategy_name: &str, num_shards: usize) -> Vec<Box<dyn CongestionStr
result
}

fn parse_workflow_names(workflow_name: &str) -> Vec<String> {
fn parse_workload_names(workload_name: &str) -> Vec<String> {
let available: Vec<String> =
vec!["Balanced".to_string(), "All To One".to_string(), "Linear Imbalance".to_string()];

if workflow_name == "all" {
if workload_name == "all" {
return available;
}

for name in &available {
if normalize_cmdline_arg(name.as_ref()) == normalize_cmdline_arg(workflow_name) {
if normalize_cmdline_arg(name.as_ref()) == normalize_cmdline_arg(workload_name) {
return vec![name.to_string()];
}
}
panic!("The requested workflow name did not match any available workflows. Requested workflow name {:?}, The available workflows are: {:?}", workflow_name, available);
panic!("The requested workload name did not match any available workloads. Requested workload name {:?}, The available workloads are: {:?}", workload_name, available);
}

fn parse_strategy_names(strategy_name: &str) -> Vec<String> {
Expand Down
2 changes: 1 addition & 1 deletion tools/congestion-model/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Model {
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ShardId(usize);
pub struct ShardId(pub usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed, or just an artefact from previous code iterations? If it's not necessary, I'd prefer this to stay private.

Making it pub is okay for our little model if it makes things easier. But generally speaking, it breaks the intended type safety. While the value is private, we can guarantee that ShardId can only be created inside this file. So it's easy to check invariants on it. That's about halve the motivation for wrapping the usize in a new type. It's the same pattern I used for QueueId, TransactionId, and ReceiptId.

If you just need read-only access to the number, adding a simple getter method, or maybe impl From<ShardId> for usize {} or even impl Deref for ShardId { type Target = usize } could do the trick without breaking type properties but still allows reading the number with id.into() or *id, which seems just as convenient as id.0.


impl Model {
pub fn new(
Expand Down
9 changes: 7 additions & 2 deletions tools/congestion-model/src/model/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::collections::VecDeque;

pub struct Queue {
shard: ShardId,
name: String,
messages: VecDeque<Receipt>,
}

impl Queue {
pub fn new(shard: ShardId) -> Self {
Self { shard, messages: VecDeque::new() }
pub fn new(shard: ShardId, name: &str) -> Self {
Self { shard, name: name.to_string(), messages: VecDeque::new() }
}

pub fn size(&self) -> u64 {
Expand All @@ -19,6 +20,10 @@ impl Queue {
pub fn shard(&self) -> ShardId {
self.shard
}

pub fn name(&self) -> &String {
&self.name
}
}

impl std::ops::Deref for Queue {
Expand Down
10 changes: 5 additions & 5 deletions tools/congestion-model/src/model/queue_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ impl QueueBundle {
};

for &shard in shards {
let mailbox = this.new_queue(shard);
let mailbox = this.new_queue(shard, "mailbox");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the heck is mailbox? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Borrowed vocabulary from the actor model

https://www.brianstorti.com/the-actor-model/

The model basically treats each shard as an actor and the implicit incoming receipts queue is the actor's mailbox. And that's no coincidence. I always like to map problems to solutions that already exist.

Not only is the model implemented this way, I think even the real implementation can be thought of this way. Taking inspiration from Erlang, Akka, Actix, and other established actor frameworks could be useful. But it's also quite limited since they generally react to full queues by dropping messages / returning failures (which I still think we can avoid) and they don't operate on a global round-based clock that the blockchain has thanks to block heights. I think those are the main differences in constraints.

Anyway, sorry for the off-topic rambling... "mailbox" as the queue name makes sense in my mind but open to other names :)

this.shard_mailbox.insert(shard, mailbox);
this.transaction_queues.insert(shard, VecDeque::new());
}

this
}

pub fn new_queue(&mut self, shard_id: ShardId) -> QueueId {
pub fn new_queue(&mut self, shard_id: ShardId, name: &str) -> QueueId {
let id = self.receipt_queues.len();
self.receipt_queues.push(Queue::new(shard_id));
self.receipt_queues.push(Queue::new(shard_id, name));
QueueId(id)
}

Expand Down Expand Up @@ -71,7 +71,7 @@ impl QueueBundle {
}

impl QueueFactory for QueueBundle {
fn register_queue(&mut self, shard_id: ShardId) -> QueueId {
self.new_queue(shard_id)
fn register_queue(&mut self, shard_id: ShardId, name: &str) -> QueueId {
self.new_queue(shard_id, name)
}
}
2 changes: 1 addition & 1 deletion tools/congestion-model/src/strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub trait CongestionStrategy {

/// Use this to create queues.
pub trait QueueFactory {
fn register_queue(&mut self, to: ShardId) -> QueueId;
fn register_queue(&mut self, to: ShardId, name: &str) -> QueueId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ impl crate::CongestionStrategy for SimpleBackpressure {
_other_shards: &[crate::ShardId],
queue_factory: &mut dyn QueueFactory,
) {
self.delayed_outgoing_receipts = Some(queue_factory.register_queue(id));
self.delayed_outgoing_receipts =
Some(queue_factory.register_queue(id, "delayed_outgoing_receipts"));
self.id = Some(id);
}

Expand Down
Loading