From 55b1dca16487196c03f7fa0a615b18c63053c759 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Thu, 23 Jan 2025 20:34:38 -0300 Subject: [PATCH 1/2] fix: benchmark dead channels --- Cargo.lock | 2 - crates/fluvio-benchmark/Cargo.toml | 2 - crates/fluvio-benchmark/src/cli.rs | 2 - crates/fluvio-benchmark/src/config/mod.rs | 2 +- .../src/producer_benchmark.rs | 152 +++++++++--------- .../fluvio-benchmark/src/producer_worker.rs | 35 +--- .../fluvio-benchmark/src/stats_collector.rs | 133 ++++++++------- crates/fluvio-benchmark/src/utils.rs | 67 +++++++- 8 files changed, 220 insertions(+), 175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ec5798c1d..13d630060a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2338,7 +2338,6 @@ dependencies = [ "anyhow", "async-channel 1.9.0", "bytesize", - "chrono", "clap", "derive_builder", "fluvio", @@ -2352,7 +2351,6 @@ dependencies = [ "serde", "serde_yaml", "thiserror 2.0.11", - "tokio", "tracing", ] diff --git a/crates/fluvio-benchmark/Cargo.toml b/crates/fluvio-benchmark/Cargo.toml index ce16e0f1a4..baef92d07c 100644 --- a/crates/fluvio-benchmark/Cargo.toml +++ b/crates/fluvio-benchmark/Cargo.toml @@ -12,7 +12,6 @@ publish = false anyhow = { workspace = true } async-channel = { workspace = true } bytesize = { workspace = true, features = ['serde'] } -chrono = { workspace = true, features = ['serde']} clap = { workspace = true, features = ["std","derive"] } derive_builder = { workspace = true } futures-util = { workspace = true } @@ -20,7 +19,6 @@ humantime = { workspace = true } hdrhistogram = { workspace = true } rand = { workspace = true } rand_xoshiro = { workspace = true } -tokio = { workspace = true, features = ['macros'] } rayon = { workspace = true } serde = { workspace = true , features = ['derive'] } serde_yaml = { workspace = true } diff --git a/crates/fluvio-benchmark/src/cli.rs b/crates/fluvio-benchmark/src/cli.rs index 953999def4..d737952535 100644 --- a/crates/fluvio-benchmark/src/cli.rs +++ b/crates/fluvio-benchmark/src/cli.rs @@ -33,8 +33,6 @@ pub enum BenchmarkMode { } pub async fn run_benchmarks(opt: BenchmarkOpt) -> Result<()> { - println!("# Fluvio Benchmark Results"); - if let Some(mode) = opt.benchmark { BenchmarkDriver::run_benchmark(mode).await?; } else { diff --git a/crates/fluvio-benchmark/src/config/mod.rs b/crates/fluvio-benchmark/src/config/mod.rs index 9fadc1f044..cefd3eb9f4 100644 --- a/crates/fluvio-benchmark/src/config/mod.rs +++ b/crates/fluvio-benchmark/src/config/mod.rs @@ -12,7 +12,7 @@ use bytesize::ByteSize; use crate::utils; const DEFAULT_BATCH_SIZE: &str = "16kib"; -const DEFAULT_QUEUE_SIZE: u64 = 10; +const DEFAULT_QUEUE_SIZE: u64 = 100; const DEFAULT_MAX_REQUEST_SIZE: &str = "32mib"; const DEFAULT_LINGER: &str = "0ms"; const DEFAULT_SERVER_TIMEOUT: &str = "5000ms"; diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index ea477af6d2..8e3bf07237 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -1,10 +1,11 @@ use anyhow::Result; -use async_channel::{unbounded, Receiver}; +use async_channel::{bounded, unbounded}; use bytesize::ByteSize; -use fluvio_future::{task::spawn, future::timeout, timer::sleep}; +use fluvio_future::{future::timeout, task::spawn, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; -use tokio::select; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use tracing::debug; use crate::{ config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils, @@ -28,7 +29,7 @@ impl ProducerBenchmark { admin.create(topic_name.clone(), false, new_topic).await?; } - println!("created topic {}", topic_name); + debug!("created topic {}", topic_name); let result = ProducerBenchmark::run_samples(config.clone()).await; sleep(std::time::Duration::from_millis(100)).await; @@ -40,80 +41,85 @@ impl ProducerBenchmark { // Clean up topic if config.delete_topic { admin.delete::(topic_name.clone()).await?; - print!("Topic deleted successfully {}", topic_name.clone()); + debug!("Topic deleted successfully {}", topic_name.clone()); } Ok(()) } async fn run_samples(config: ProducerConfig) -> Result<()> { - let mut tx_controls = Vec::new(); - let mut workers_jh = Vec::new(); - let (stats_sender, stats_receiver) = unbounded(); - let (end_sender, end_receiver) = unbounded(); + let (end_sender, end_receiver) = bounded(1); let stat_collector = - StatCollector::create(config.num_records, end_sender.clone(), stats_sender.clone()); + StatCollector::create(config.num_records, stats_sender.clone(), end_sender.clone()); // Set up producers - for producer_id in 0..config.num_producers { - let (event_sender, event_receiver) = unbounded(); - stat_collector.add_producer(event_receiver); - println!("starting up producer {}", producer_id); - let (tx_control, rx_control) = unbounded(); - let worker = ProducerWorker::new(producer_id, config.clone(), event_sender).await?; - let jh = spawn(timeout( - config.worker_timeout, - ProducerDriver::main_loop(rx_control, worker), - )); + spawn(async move { + let worker_futures = FuturesUnordered::new(); + for producer_id in 0..config.num_producers { + let (event_sender, event_receiver) = unbounded(); + stat_collector.add_producer(event_receiver); + let config = config.clone(); + let jh = spawn(timeout(config.worker_timeout, async move { + debug!("starting up producer {}", producer_id); + let worker = ProducerWorker::new(producer_id, config, event_sender) + .await + .expect("create producer worker"); + ProducerDriver::main_loop(worker).await.expect("main loop"); + })); + + worker_futures.push(jh); + } + + for worker in worker_futures.collect::>().await { + worker.expect("producer worker failed"); + } + }); - tx_control.send(ControlMessage::SendBatch).await?; - tx_controls.push(tx_control); - workers_jh.push(jh); - } println!("Benchmark started"); - loop { - select! { - stat_rx = stats_receiver.recv() => { - if let Ok(stat) = stat_rx { - let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string(); - println!( - "{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency", - stat.record_send, stat.records_per_sec, human_readable_bytes, - utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max) - ); - } - } - end = end_receiver.recv() => { - if let Ok(end) = end { - let mut latency_yaml = String::new(); - latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency", - utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64), - utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0)))); - for percentile in [0.5, 0.95, 0.99] { - latency_yaml.push_str(&format!( - ", {:.2}ms p{percentile:4.2}", - utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(percentile)), - )); - } - println!(); - println!("{}", latency_yaml); - - let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string(); - println!( - "{} total records sent, {} records/sec: ({}/sec) ", - end.total_records, end.records_per_sec, human_readable_bytes - ); - } - break; - } + spawn(async move { + while let Ok(stat) = stats_receiver.recv().await { + let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string(); + println!( + "{} records sent, {} records/sec: ({}/sec), {} avg latency, {} max latency", + stat.record_send, + stat.records_per_sec, + human_readable_bytes, + utils::nanos_to_ms_pritable(stat.latency_avg), + utils::nanos_to_ms_pritable(stat.latency_max) + ); } - } - - // Wait for all producers to finish - for jh in workers_jh { - jh.await??; + }); + + if let Ok(end) = end_receiver.recv().await { + // sleep enough time to make sure all stats are printed + sleep(std::time::Duration::from_secs(1)).await; + let mut latency_yaml = String::new(); + latency_yaml.push_str(&format!( + "{} avg latency, {} max latency", + utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64), + utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0)) + )); + for percentile in [0.5, 0.95, 0.99] { + latency_yaml.push_str(&format!( + ", {} p{percentile:4.2}", + utils::nanos_to_ms_pritable( + end.latencies_histogram.value_at_quantile(percentile) + ), + )); + } + println!(); + println!("{}", latency_yaml); + + let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string(); + println!( + "{} total records sent, {} records/sec: ({}/sec), Total time: {}", + end.total_records, + end.records_per_sec, + human_readable_bytes, + utils::pretty_duration(end.elapsed) + ); } // Print stats @@ -126,22 +132,8 @@ impl ProducerBenchmark { struct ProducerDriver; impl ProducerDriver { - async fn main_loop(rx: Receiver, worker: ProducerWorker) -> Result<()> { - //loop { - match rx.recv().await? { - ControlMessage::SendBatch => { - println!("producer send batch"); - if let Err(err) = worker.send_batch().await { - println!("producer send batch error: {:#?}", err); - } - } - }; - //} + async fn main_loop(worker: ProducerWorker) -> Result<()> { + worker.send_batch().await?; Ok(()) } } - -#[derive(Clone, Copy, Debug)] -enum ControlMessage { - SendBatch, -} diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index 7ee1422767..c09b8a228d 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -9,6 +9,7 @@ use fluvio::{ TopicProducerConfigBuilder, TopicProducerPool, }; use futures_util::future::BoxFuture; +use tracing::debug; use crate::{ config::{ProducerConfig, RecordKeyAllocationStrategy}, @@ -68,7 +69,7 @@ impl ProducerWorker { .topic_producer_with_config(config.topic_name.clone(), fluvio_config) .await?; - let num_records = records_per_producer(id, config.num_producers, config.num_records); + let num_records = utils::records_per_producer(id, config.num_producers, config.num_records); let records_to_send = create_records(config.clone(), num_records, id); @@ -79,7 +80,7 @@ impl ProducerWorker { } pub async fn send_batch(self) -> Result<()> { - println!("producer is sending batch"); + debug!("producer is sending batch"); for record in self.records_to_send.into_iter() { let _ = self @@ -123,33 +124,3 @@ impl BenchmarkRecord { Self { key, data } } } - -/// Calculate the number of records each producer should send -fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 { - if id == 0 { - num_records / num_producers + num_records % num_producers - } else { - num_records / num_producers - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_get_num_records_per_producer() { - let num_producers = 3; - let num_records = 10; - - assert_eq!(records_per_producer(0, num_producers, num_records), 4); - assert_eq!(records_per_producer(1, num_producers, num_records), 3); - assert_eq!(records_per_producer(2, num_producers, num_records), 3); - - let num_producers = 3; - let num_records = 12; - assert_eq!(records_per_producer(0, num_producers, num_records), 4); - assert_eq!(records_per_producer(1, num_producers, num_records), 4); - assert_eq!(records_per_producer(2, num_producers, num_records), 4); - } -} diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index db0cd933cc..3f725b02b0 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,6 +1,6 @@ use std::{ f64, - sync::{atomic::AtomicU64, Arc}, + sync::{atomic::AtomicU64, Arc, OnceLock}, time::{Duration, Instant}, }; @@ -8,13 +8,14 @@ use async_channel::{Receiver, Sender}; use fluvio::ProduceCompletionBatchEvent; use fluvio_future::{sync::RwLock, task::spawn, timer::sleep}; use hdrhistogram::Histogram; +use tracing::trace; pub(crate) struct ProducerStat {} pub struct TotalStats { record_send: AtomicU64, record_bytes: AtomicU64, - first_start_time: RwLock>, + first_start_time: OnceLock, } pub struct Stats { @@ -31,6 +32,7 @@ pub struct EndProducerStat { pub total_records: u64, pub records_per_sec: u64, pub bytes_per_sec: u64, + pub elapsed: Duration, } impl ProducerStat { @@ -41,10 +43,10 @@ impl ProducerStat { latencies: Arc>>, event_receiver: Receiver, ) -> Self { - Self::track_latency( + Self::track_producer_stats( num_records, end_sender, - latencies, + latencies.clone(), event_receiver, total_stats.clone(), ); @@ -52,7 +54,7 @@ impl ProducerStat { Self {} } - fn track_latency( + fn track_producer_stats( num_records: u64, end_sender: Sender, latencies: Arc>>, @@ -63,68 +65,80 @@ impl ProducerStat { let latency_histogram = latencies.clone(); while let Ok(event) = event_receiver.recv().await { let latencies = latency_histogram.clone(); - let stats = total_stats.clone(); + let total_stats = total_stats.clone(); + let end_sender = end_sender.clone(); + total_stats + .first_start_time + .get_or_init(|| event.created_at); spawn(async move { - if stats.first_start_time.read().await.is_none() { - stats - .first_start_time - .write() - .await - .replace(event.created_at); - } let mut lantencies = latencies.write().await; lantencies.push(event.elapsed.as_nanos() as u64); drop(lantencies); - stats + total_stats .record_send .fetch_add(event.records_len, std::sync::atomic::Ordering::Relaxed); - stats + total_stats .record_bytes .fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed); + + ProducerStat::send_end(num_records, end_sender, latencies, total_stats).await; }); } + }); + } - // send end - let record_send = total_stats - .record_send + async fn send_end( + num_records: u64, + end_sender: Sender, + latencies: Arc>>, + total_stats: Arc, + ) { + let latency_histogram = latencies.clone(); + + let record_send = total_stats + .record_send + .load(std::sync::atomic::Ordering::Relaxed); + + if record_send >= num_records { + let record_bytes = total_stats + .record_bytes .load(std::sync::atomic::Ordering::Relaxed); - if record_send >= num_records { - let record_bytes = total_stats - .record_bytes - .load(std::sync::atomic::Ordering::Relaxed); - let latency_histogram = latency_histogram.read().await; - let elapsed = total_stats - .first_start_time - .read() - .await - .expect("start time") - .elapsed(); + let latency_histogram = latency_histogram.read().await; + let elapsed = total_stats + .first_start_time + .get() + .expect("start time") + .elapsed(); - let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; - let records_per_sec = (record_send as f64 / elapsed_seconds).round() as u64; - let bytes_per_sec = (record_bytes as f64 / elapsed_seconds).round() as u64; + let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; + let records_per_sec = (record_send as f64 / elapsed_seconds).round() as u64; + let bytes_per_sec = (record_bytes as f64 / elapsed_seconds).round() as u64; - let mut latencies_histogram = Histogram::::new(3).expect("new histogram"); - for value in latency_histogram.iter() { - latencies_histogram.record(*value).expect("record"); - } + let mut latencies_histogram = Histogram::::new(3).expect("new histogram"); + for value in latency_histogram.iter() { + latencies_histogram.record(*value).expect("record"); + } + + let end = EndProducerStat { + latencies_histogram, + total_records: record_send, + records_per_sec, + bytes_per_sec, + elapsed, + }; - let end = EndProducerStat { - latencies_histogram, - total_records: record_send, - records_per_sec, - bytes_per_sec, - }; - end_sender.send(end).await.expect("send end"); + // check if any producer already sent it + if let Err(e) = end_sender.send(end).await { + trace!("error sending end: {}", e); } - }); + } } } pub(crate) struct StatCollector { - total_stats: Arc, num_records: u64, + total_stats: Arc, end_sender: Sender, latencies_histogram: Arc>>, } @@ -132,19 +146,23 @@ pub(crate) struct StatCollector { impl StatCollector { pub(crate) fn create( num_records: u64, - end_sender: Sender, stats_sender: Sender, + end_sender: Sender, ) -> Self { let latencies = Arc::new(RwLock::new(Vec::with_capacity(num_records as usize))); let total_stats = Arc::new(TotalStats { record_send: AtomicU64::new(0), record_bytes: AtomicU64::new(0), - first_start_time: RwLock::new(None), + first_start_time: OnceLock::new(), }); - Self::send_stats(latencies.clone(), stats_sender.clone(), total_stats.clone()) - .expect("send stats"); + Self::send_stats( + num_records, + latencies.clone(), + stats_sender.clone(), + total_stats.clone(), + ); Self { total_stats, @@ -165,10 +183,11 @@ impl StatCollector { } fn send_stats( + num_records: u64, latencies: Arc>>, stats_sender: Sender, total_stats: Arc, - ) -> Result<(), std::io::Error> { + ) { spawn(async move { loop { let stats_sender = stats_sender.clone(); @@ -176,6 +195,10 @@ impl StatCollector { let old_record_send = total_stats .record_send .load(std::sync::atomic::Ordering::Relaxed); + if old_record_send >= num_records { + break; + } + let old_record_bytes = total_stats .record_bytes .load(std::sync::atomic::Ordering::Relaxed); @@ -183,7 +206,7 @@ impl StatCollector { let old_latencies_len = old_latencies.len(); drop(old_latencies); sleep(Duration::from_secs(1)).await; - let first_start_time = total_stats.first_start_time.read().await; + let first_start_time = total_stats.first_start_time.get(); if first_start_time.is_none() { continue; } @@ -224,7 +247,8 @@ impl StatCollector { let latency_avg = latencies_histogram.mean() as u64; let latency_max = latencies_histogram.value_at_quantile(1.0); - stats_sender + // ignore if channel is closed + if let Err(e) = stats_sender .send(Stats { record_send, record_bytes, @@ -234,9 +258,10 @@ impl StatCollector { latency_max, }) .await - .expect("send stats"); + { + trace!("error sending stats: {}", e); + } } }); - Ok(()) } } diff --git a/crates/fluvio-benchmark/src/utils.rs b/crates/fluvio-benchmark/src/utils.rs index abd14e419c..0288b7ae75 100644 --- a/crates/fluvio-benchmark/src/utils.rs +++ b/crates/fluvio-benchmark/src/utils.rs @@ -47,6 +47,69 @@ pub fn generate_random_string_vec(num: usize, size: usize) -> Vec { random_strings } -pub fn nanos_to_ms_pritable(nano: u64) -> f64 { - Duration::from_nanos(nano).as_secs_f64() * 1000.0 +pub fn nanos_to_ms_pritable(nano: u64) -> String { + pretty_duration(Duration::from_nanos(nano)) +} + +pub fn pretty_duration(d: Duration) -> String { + let nanos = d.as_nanos(); + // 1 ns = 1 + // 1 µs = 1,000 ns + // 1 ms = 1,000,000 ns + // 1 s = 1,000,000,000 ns + // 1 m = 60 s + + if nanos < 1_000 { + // Less than 1µs, display in ns + format!("{}ns", nanos) + } else if nanos < 1_000_000 { + // Less than 1ms, display in µs + let us = nanos as f64 / 1_000.0; + format!("{:.1}µs", us) + } else if nanos < 1_000_000_000 { + // Less than 1s, display in ms + let ms = nanos as f64 / 1_000_000.0; + format!("{:.1}ms", ms) + } else { + // Now we’re at least 1 second + let secs = nanos as f64 / 1_000_000_000.0; + if secs < 60.0 { + // Less than a minute, display in seconds + format!("{:.1}s", secs) + } else { + // Otherwise, display in minutes + let mins = secs / 60.0; + format!("{:.1}m", mins) + } + } +} + +/// Calculate the number of records each producer should send +pub fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 { + if id == 0 { + num_records / num_producers + num_records % num_producers + } else { + num_records / num_producers + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_num_records_per_producer() { + let num_producers = 3; + let num_records = 10; + + assert_eq!(records_per_producer(0, num_producers, num_records), 4); + assert_eq!(records_per_producer(1, num_producers, num_records), 3); + assert_eq!(records_per_producer(2, num_producers, num_records), 3); + + let num_producers = 3; + let num_records = 12; + assert_eq!(records_per_producer(0, num_producers, num_records), 4); + assert_eq!(records_per_producer(1, num_producers, num_records), 4); + assert_eq!(records_per_producer(2, num_producers, num_records), 4); + } } From 4718aec8419a4b6380ea2625483c1e8cff1ba4d0 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Fri, 24 Jan 2025 15:42:45 -0300 Subject: [PATCH 2/2] fix: last benchmark progress slow --- Cargo.lock | 2 + crates/fluvio-benchmark/Cargo.toml | 2 + .../src/producer_benchmark.rs | 37 ++- .../fluvio-benchmark/src/stats_collector.rs | 214 +++++++++--------- 4 files changed, 140 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 13d630060a..6936171f41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2345,12 +2345,14 @@ dependencies = [ "futures-util", "hdrhistogram", "humantime", + "once_cell", "rand", "rand_xoshiro", "rayon", "serde", "serde_yaml", "thiserror 2.0.11", + "tokio", "tracing", ] diff --git a/crates/fluvio-benchmark/Cargo.toml b/crates/fluvio-benchmark/Cargo.toml index baef92d07c..d2a9524995 100644 --- a/crates/fluvio-benchmark/Cargo.toml +++ b/crates/fluvio-benchmark/Cargo.toml @@ -17,9 +17,11 @@ derive_builder = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } hdrhistogram = { workspace = true } +once_cell = { workspace = true } rand = { workspace = true } rand_xoshiro = { workspace = true } rayon = { workspace = true } +tokio = { workspace = true, features = ['sync', 'macros'] } serde = { workspace = true , features = ['derive'] } serde_yaml = { workspace = true } thiserror = { workspace = true } diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index 8e3bf07237..50e092d064 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -1,14 +1,20 @@ +use std::sync::Arc; + use anyhow::Result; -use async_channel::{bounded, unbounded}; +use async_channel::unbounded; use bytesize::ByteSize; use fluvio_future::{future::timeout, task::spawn, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; use futures_util::{stream::FuturesUnordered, StreamExt}; +use tokio::sync::broadcast; use tracing::debug; use crate::{ - config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils, + config::ProducerConfig, + producer_worker::ProducerWorker, + stats_collector::{EndProducerStat, StatCollector, Stats}, + utils, }; pub struct ProducerBenchmark {} @@ -49,11 +55,21 @@ impl ProducerBenchmark { async fn run_samples(config: ProducerConfig) -> Result<()> { let (stats_sender, stats_receiver) = unbounded(); - let (end_sender, end_receiver) = bounded(1); + let (end_sender, mut end_receiver) = broadcast::channel(2); + let end_sender = Arc::new(end_sender); let stat_collector = StatCollector::create(config.num_records, stats_sender.clone(), end_sender.clone()); - // Set up producers + Self::setup_producers(config.clone(), stat_collector).await; + println!("Benchmark started"); + Self::print_progress_on_backgroud(stats_receiver).await; + Self::print_benchmark_on_end(&mut end_receiver).await; + println!("Benchmark completed"); + + Ok(()) + } + + async fn setup_producers(config: ProducerConfig, stat_collector: StatCollector) { spawn(async move { let worker_futures = FuturesUnordered::new(); for producer_id in 0..config.num_producers { @@ -75,9 +91,9 @@ impl ProducerBenchmark { worker.expect("producer worker failed"); } }); + } - println!("Benchmark started"); - + async fn print_progress_on_backgroud(stats_receiver: async_channel::Receiver) { spawn(async move { while let Ok(stat) = stats_receiver.recv().await { let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string(); @@ -91,7 +107,9 @@ impl ProducerBenchmark { ); } }); + } + async fn print_benchmark_on_end(end_receiver: &mut broadcast::Receiver) { if let Ok(end) = end_receiver.recv().await { // sleep enough time to make sure all stats are printed sleep(std::time::Duration::from_secs(1)).await; @@ -114,18 +132,13 @@ impl ProducerBenchmark { let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string(); println!( - "{} total records sent, {} records/sec: ({}/sec), Total time: {}", + "{} total records sent, {} records/sec: ({}/sec), total time: {}", end.total_records, end.records_per_sec, human_readable_bytes, utils::pretty_duration(end.elapsed) ); } - - // Print stats - println!("Benchmark completed"); - - Ok(()) } } diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index 3f725b02b0..52acb5d807 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,13 +1,15 @@ use std::{ f64, - sync::{atomic::AtomicU64, Arc, OnceLock}, + sync::{atomic::AtomicU64, Arc}, time::{Duration, Instant}, }; use async_channel::{Receiver, Sender}; use fluvio::ProduceCompletionBatchEvent; -use fluvio_future::{sync::RwLock, task::spawn, timer::sleep}; +use fluvio_future::{sync::RwLock, task::spawn}; use hdrhistogram::Histogram; +use once_cell::sync::OnceCell; +use tokio::{select, sync::broadcast}; use tracing::trace; pub(crate) struct ProducerStat {} @@ -15,7 +17,13 @@ pub(crate) struct ProducerStat {} pub struct TotalStats { record_send: AtomicU64, record_bytes: AtomicU64, - first_start_time: OnceLock, + first_start_time: OnceCell, +} + +pub struct CentralStats { + pub record_send: u64, + pub record_bytes: u64, + pub latency: u64, } pub struct Stats { @@ -27,6 +35,7 @@ pub struct Stats { pub latency_max: u64, } +#[derive(Clone)] pub struct EndProducerStat { pub latencies_histogram: Histogram, pub total_records: u64, @@ -37,16 +46,16 @@ pub struct EndProducerStat { impl ProducerStat { pub(crate) fn new( + central_stats_tx: Sender, num_records: u64, - end_sender: Sender, + end_sender: Arc>, total_stats: Arc, - latencies: Arc>>, event_receiver: Receiver, ) -> Self { Self::track_producer_stats( + central_stats_tx, num_records, end_sender, - latencies.clone(), event_receiver, total_stats.clone(), ); @@ -55,25 +64,26 @@ impl ProducerStat { } fn track_producer_stats( + central_stats_tx: Sender, num_records: u64, - end_sender: Sender, - latencies: Arc>>, + end_sender: Arc>, event_receiver: Receiver, total_stats: Arc, ) { + let latencies = Arc::new(RwLock::new(Vec::with_capacity(num_records as usize))); spawn(async move { - let latency_histogram = latencies.clone(); while let Ok(event) = event_receiver.recv().await { - let latencies = latency_histogram.clone(); - let total_stats = total_stats.clone(); - let end_sender = end_sender.clone(); total_stats .first_start_time .get_or_init(|| event.created_at); + let latencies = latencies.clone(); + let total_stats = total_stats.clone(); + let end_sender = end_sender.clone(); + let central_stats_tx = central_stats_tx.clone(); spawn(async move { - let mut lantencies = latencies.write().await; - lantencies.push(event.elapsed.as_nanos() as u64); - drop(lantencies); + let mut write_latencies = latencies.write().await; + write_latencies.push(event.elapsed.as_nanos() as u64); + drop(write_latencies); total_stats .record_send @@ -82,6 +92,15 @@ impl ProducerStat { .record_bytes .fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed); + central_stats_tx + .send(CentralStats { + record_send: event.records_len, + record_bytes: event.bytes_size, + latency: event.elapsed.as_nanos() as u64, + }) + .await + .expect("send stats"); + ProducerStat::send_end(num_records, end_sender, latencies, total_stats).await; }); } @@ -90,7 +109,7 @@ impl ProducerStat { async fn send_end( num_records: u64, - end_sender: Sender, + end_sender: Arc>, latencies: Arc>>, total_stats: Arc, ) { @@ -129,7 +148,7 @@ impl ProducerStat { }; // check if any producer already sent it - if let Err(e) = end_sender.send(end).await { + if let Err(e) = end_sender.send(end) { trace!("error sending end: {}", e); } } @@ -139,127 +158,116 @@ impl ProducerStat { pub(crate) struct StatCollector { num_records: u64, total_stats: Arc, - end_sender: Sender, - latencies_histogram: Arc>>, + end_sender: Arc>, + central_stats_tx: Sender, } impl StatCollector { pub(crate) fn create( num_records: u64, - stats_sender: Sender, - end_sender: Sender, + print_stats_sender: Sender, + end_sender: Arc>, ) -> Self { - let latencies = Arc::new(RwLock::new(Vec::with_capacity(num_records as usize))); + let (central_stats_tx, central_stats_rx) = async_channel::unbounded(); let total_stats = Arc::new(TotalStats { record_send: AtomicU64::new(0), record_bytes: AtomicU64::new(0), - first_start_time: OnceLock::new(), + first_start_time: OnceCell::new(), }); - Self::send_stats( - num_records, - latencies.clone(), - stats_sender.clone(), - total_stats.clone(), + Self::send_central_stats( + print_stats_sender.clone(), + end_sender.clone(), + central_stats_rx, ); Self { total_stats, num_records, end_sender, - latencies_histogram: latencies, + central_stats_tx, } } pub(crate) fn add_producer(&self, event_receiver: Receiver) { ProducerStat::new( + self.central_stats_tx.clone(), self.num_records, self.end_sender.clone(), self.total_stats.clone(), - self.latencies_histogram.clone(), event_receiver, ); } - fn send_stats( - num_records: u64, - latencies: Arc>>, + fn send_central_stats( stats_sender: Sender, - total_stats: Arc, + end_broadcast: Arc>, + central_stats_rx: Receiver, ) { spawn(async move { - loop { - let stats_sender = stats_sender.clone(); - let total_stats = Arc::clone(&total_stats); - let old_record_send = total_stats - .record_send - .load(std::sync::atomic::Ordering::Relaxed); - if old_record_send >= num_records { - break; - } - - let old_record_bytes = total_stats - .record_bytes - .load(std::sync::atomic::Ordering::Relaxed); - let old_latencies = latencies.read().await; - let old_latencies_len = old_latencies.len(); - drop(old_latencies); - sleep(Duration::from_secs(1)).await; - let first_start_time = total_stats.first_start_time.get(); - if first_start_time.is_none() { - continue; - } - - let total_record_send = total_stats - .record_send - .load(std::sync::atomic::Ordering::Relaxed); - let total_record_bytes = total_stats - .record_bytes - .load(std::sync::atomic::Ordering::Relaxed); - - if total_record_send == old_record_send { - continue; - } + let mut is_the_first = true; + let mut instant = Instant::now(); + let mut central_stats = CentralStats { + record_send: 0, + record_bytes: 0, + latency: 0, + }; + let mut latencies_histogram = + hdrhistogram::Histogram::::new(3).expect("new histogram"); - let record_send = total_record_send - old_record_send; - let record_bytes = total_record_bytes - old_record_bytes; - let elapsed = first_start_time.expect("start time").elapsed(); - let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; - let records_per_sec = (total_record_send as f64 / elapsed_seconds).round() as u64; - let bytes_per_sec = (total_record_bytes as f64 / elapsed_seconds).round() as u64; - - let total_latencies = latencies.read().await; - - let new_latencies = total_latencies - .clone() - .into_iter() - .skip(old_latencies_len) - .collect::>(); - drop(total_latencies); - - let mut latencies_histogram = - hdrhistogram::Histogram::::new(3).expect("new histogram"); - for value in new_latencies { - latencies_histogram.record(value).expect("record"); - } + let mut end_broadcast = end_broadcast.subscribe(); - let latency_avg = latencies_histogram.mean() as u64; - let latency_max = latencies_histogram.value_at_quantile(1.0); - - // ignore if channel is closed - if let Err(e) = stats_sender - .send(Stats { - record_send, - record_bytes, - records_per_sec, - bytes_per_sec, - latency_avg, - latency_max, - }) - .await - { - trace!("error sending stats: {}", e); + loop { + select! { + stat = central_stats_rx.recv() => { + if let Ok(stats) = stat { + if is_the_first { + is_the_first = false; + instant = Instant::now(); + } + + let elapsed = instant.elapsed(); + let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; + + central_stats.record_send += stats.record_send; + central_stats.record_bytes += stats.record_bytes; + central_stats.latency += stats.latency; + latencies_histogram.record(stats.latency).expect("record"); + + if elapsed.as_secs() >= 5 { + let latency_avg = latencies_histogram.mean() as u64; + let latency_max = latencies_histogram.value_at_quantile(1.0); + let records_per_sec = + (central_stats.record_send as f64 / elapsed_seconds).round() as u64; + let bytes_per_sec = + (central_stats.record_bytes as f64 / elapsed_seconds).round() as u64; + + let _ = stats_sender + .send(Stats { + record_send: central_stats.record_send, + record_bytes: central_stats.record_bytes, + records_per_sec, + bytes_per_sec, + latency_avg, + latency_max, + }) + .await; + + instant = Instant::now(); + central_stats = CentralStats { + record_send: 0, + record_bytes: 0, + latency: 0, + }; + latencies_histogram = + hdrhistogram::Histogram::::new(3).expect("new histogram"); + } + } + } + _ = end_broadcast.recv() => { + break + } } } });