Skip to content

Commit

Permalink
fix: last benchmark progress slow
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jan 24, 2025
1 parent 1d2224a commit 1a391a0
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 115 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/fluvio-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ derive_builder = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
hdrhistogram = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true, features = ['sync', 'macros'] }
serde = { workspace = true , features = ['derive'] }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
Expand Down
37 changes: 25 additions & 12 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::sync::Arc;

use anyhow::Result;
use async_channel::{bounded, unbounded};
use async_channel::unbounded;

use bytesize::ByteSize;
use fluvio_future::{future::timeout, task::spawn, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use futures_util::{stream::FuturesUnordered, StreamExt};
use tokio::sync::broadcast;
use tracing::debug;

use crate::{
config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils,
config::ProducerConfig,
producer_worker::ProducerWorker,
stats_collector::{EndProducerStat, StatCollector, Stats},
utils,
};

pub struct ProducerBenchmark {}
Expand Down Expand Up @@ -49,11 +55,21 @@ impl ProducerBenchmark {

async fn run_samples(config: ProducerConfig) -> Result<()> {
let (stats_sender, stats_receiver) = unbounded();
let (end_sender, end_receiver) = bounded(1);
let (end_sender, mut end_receiver) = broadcast::channel(2);
let end_sender = Arc::new(end_sender);
let stat_collector =
StatCollector::create(config.num_records, stats_sender.clone(), end_sender.clone());

// Set up producers
Self::setup_producers(config.clone(), stat_collector).await;
println!("Benchmark started");
Self::print_progress_on_backgroud(stats_receiver).await;
Self::print_benchmark_on_end(&mut end_receiver).await;
println!("Benchmark completed");

Ok(())
}

async fn setup_producers(config: ProducerConfig, stat_collector: StatCollector) {
spawn(async move {
let worker_futures = FuturesUnordered::new();
for producer_id in 0..config.num_producers {
Expand All @@ -75,9 +91,9 @@ impl ProducerBenchmark {
worker.expect("producer worker failed");
}
});
}

println!("Benchmark started");

async fn print_progress_on_backgroud(stats_receiver: async_channel::Receiver<Stats>) {
spawn(async move {
while let Ok(stat) = stats_receiver.recv().await {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
Expand All @@ -91,7 +107,9 @@ impl ProducerBenchmark {
);
}
});
}

async fn print_benchmark_on_end(end_receiver: &mut broadcast::Receiver<EndProducerStat>) {
if let Ok(end) = end_receiver.recv().await {
// sleep enough time to make sure all stats are printed
sleep(std::time::Duration::from_secs(1)).await;
Expand All @@ -114,18 +132,13 @@ impl ProducerBenchmark {

let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string();
println!(
"{} total records sent, {} records/sec: ({}/sec), Total time: {}",
"{} total records sent, {} records/sec: ({}/sec), total time: {}",
end.total_records,
end.records_per_sec,
human_readable_bytes,
utils::pretty_duration(end.elapsed)
);
}

// Print stats
println!("Benchmark completed");

Ok(())
}
}

Expand Down
Loading

0 comments on commit 1a391a0

Please sign in to comment.