diff --git a/crates/fluvio-benchmark/src/cli.rs b/crates/fluvio-benchmark/src/cli.rs index 953999def4..d737952535 100644 --- a/crates/fluvio-benchmark/src/cli.rs +++ b/crates/fluvio-benchmark/src/cli.rs @@ -33,8 +33,6 @@ pub enum BenchmarkMode { } pub async fn run_benchmarks(opt: BenchmarkOpt) -> Result<()> { - println!("# Fluvio Benchmark Results"); - if let Some(mode) = opt.benchmark { BenchmarkDriver::run_benchmark(mode).await?; } else { diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index ea477af6d2..5a2f0055f9 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -1,10 +1,12 @@ use anyhow::Result; -use async_channel::{unbounded, Receiver}; +use async_channel::unbounded; use bytesize::ByteSize; -use fluvio_future::{task::spawn, future::timeout, timer::sleep}; +use fluvio_future::{future::timeout, task::spawn, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; +use futures_util::{stream::FuturesUnordered, StreamExt}; use tokio::select; +use tracing::debug; use crate::{ config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils, @@ -28,7 +30,7 @@ impl ProducerBenchmark { admin.create(topic_name.clone(), false, new_topic).await?; } - println!("created topic {}", topic_name); + debug!("created topic {}", topic_name); let result = ProducerBenchmark::run_samples(config.clone()).await; sleep(std::time::Duration::from_millis(100)).await; @@ -40,37 +42,41 @@ impl ProducerBenchmark { // Clean up topic if config.delete_topic { admin.delete::(topic_name.clone()).await?; - print!("Topic deleted successfully {}", topic_name.clone()); + debug!("Topic deleted successfully {}", topic_name.clone()); } Ok(()) } async fn run_samples(config: ProducerConfig) -> Result<()> { - let mut tx_controls = Vec::new(); - let mut workers_jh = Vec::new(); - let (stats_sender, stats_receiver) = unbounded(); let (end_sender, end_receiver) = unbounded(); let stat_collector = - StatCollector::create(config.num_records, end_sender.clone(), stats_sender.clone()); + StatCollector::create(config.num_records, stats_sender.clone(), end_sender.clone()); // Set up producers - for producer_id in 0..config.num_producers { - let (event_sender, event_receiver) = unbounded(); - stat_collector.add_producer(event_receiver); - println!("starting up producer {}", producer_id); - let (tx_control, rx_control) = unbounded(); - let worker = ProducerWorker::new(producer_id, config.clone(), event_sender).await?; - let jh = spawn(timeout( - config.worker_timeout, - ProducerDriver::main_loop(rx_control, worker), - )); - - tx_control.send(ControlMessage::SendBatch).await?; - tx_controls.push(tx_control); - workers_jh.push(jh); - } + spawn(async move { + let worker_futures = FuturesUnordered::new(); + for producer_id in 0..config.num_producers { + let (event_sender, event_receiver) = unbounded(); + stat_collector.add_producer(event_receiver); + let config = config.clone(); + let jh = spawn(timeout(config.worker_timeout, async move { + debug!("starting up producer {}", producer_id); + let worker = ProducerWorker::new(producer_id, config, event_sender) + .await + .expect("create producer worker"); + ProducerDriver::main_loop(worker).await.expect("main loop"); + })); + + worker_futures.push(jh); + } + + for worker in worker_futures.collect::>().await { + worker.expect("producer worker failed"); + } + }); + println!("Benchmark started"); loop { @@ -105,17 +111,13 @@ impl ProducerBenchmark { "{} total records sent, {} records/sec: ({}/sec) ", end.total_records, end.records_per_sec, human_readable_bytes ); + + break; } - break; } } } - // Wait for all producers to finish - for jh in workers_jh { - jh.await??; - } - // Print stats println!("Benchmark completed"); @@ -126,22 +128,8 @@ impl ProducerBenchmark { struct ProducerDriver; impl ProducerDriver { - async fn main_loop(rx: Receiver, worker: ProducerWorker) -> Result<()> { - //loop { - match rx.recv().await? { - ControlMessage::SendBatch => { - println!("producer send batch"); - if let Err(err) = worker.send_batch().await { - println!("producer send batch error: {:#?}", err); - } - } - }; - //} + async fn main_loop(worker: ProducerWorker) -> Result<()> { + worker.send_batch().await?; Ok(()) } } - -#[derive(Clone, Copy, Debug)] -enum ControlMessage { - SendBatch, -} diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index 7ee1422767..c09b8a228d 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -9,6 +9,7 @@ use fluvio::{ TopicProducerConfigBuilder, TopicProducerPool, }; use futures_util::future::BoxFuture; +use tracing::debug; use crate::{ config::{ProducerConfig, RecordKeyAllocationStrategy}, @@ -68,7 +69,7 @@ impl ProducerWorker { .topic_producer_with_config(config.topic_name.clone(), fluvio_config) .await?; - let num_records = records_per_producer(id, config.num_producers, config.num_records); + let num_records = utils::records_per_producer(id, config.num_producers, config.num_records); let records_to_send = create_records(config.clone(), num_records, id); @@ -79,7 +80,7 @@ impl ProducerWorker { } pub async fn send_batch(self) -> Result<()> { - println!("producer is sending batch"); + debug!("producer is sending batch"); for record in self.records_to_send.into_iter() { let _ = self @@ -123,33 +124,3 @@ impl BenchmarkRecord { Self { key, data } } } - -/// Calculate the number of records each producer should send -fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 { - if id == 0 { - num_records / num_producers + num_records % num_producers - } else { - num_records / num_producers - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_get_num_records_per_producer() { - let num_producers = 3; - let num_records = 10; - - assert_eq!(records_per_producer(0, num_producers, num_records), 4); - assert_eq!(records_per_producer(1, num_producers, num_records), 3); - assert_eq!(records_per_producer(2, num_producers, num_records), 3); - - let num_producers = 3; - let num_records = 12; - assert_eq!(records_per_producer(0, num_producers, num_records), 4); - assert_eq!(records_per_producer(1, num_producers, num_records), 4); - assert_eq!(records_per_producer(2, num_producers, num_records), 4); - } -} diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index db0cd933cc..6a4e689a64 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,6 +1,6 @@ use std::{ f64, - sync::{atomic::AtomicU64, Arc}, + sync::{atomic::AtomicU64, Arc, OnceLock}, time::{Duration, Instant}, }; @@ -8,13 +8,14 @@ use async_channel::{Receiver, Sender}; use fluvio::ProduceCompletionBatchEvent; use fluvio_future::{sync::RwLock, task::spawn, timer::sleep}; use hdrhistogram::Histogram; +use tracing::trace; pub(crate) struct ProducerStat {} pub struct TotalStats { record_send: AtomicU64, record_bytes: AtomicU64, - first_start_time: RwLock>, + first_start_time: OnceLock, } pub struct Stats { @@ -41,10 +42,10 @@ impl ProducerStat { latencies: Arc>>, event_receiver: Receiver, ) -> Self { - Self::track_latency( + Self::track_producer_stats( num_records, end_sender, - latencies, + latencies.clone(), event_receiver, total_stats.clone(), ); @@ -52,7 +53,7 @@ impl ProducerStat { Self {} } - fn track_latency( + fn track_producer_stats( num_records: u64, end_sender: Sender, latencies: Arc>>, @@ -63,68 +64,79 @@ impl ProducerStat { let latency_histogram = latencies.clone(); while let Ok(event) = event_receiver.recv().await { let latencies = latency_histogram.clone(); - let stats = total_stats.clone(); + let total_stats = total_stats.clone(); + let end_sender = end_sender.clone(); + total_stats + .first_start_time + .get_or_init(|| event.created_at); spawn(async move { - if stats.first_start_time.read().await.is_none() { - stats - .first_start_time - .write() - .await - .replace(event.created_at); - } let mut lantencies = latencies.write().await; lantencies.push(event.elapsed.as_nanos() as u64); drop(lantencies); - stats + total_stats .record_send .fetch_add(event.records_len, std::sync::atomic::Ordering::Relaxed); - stats + total_stats .record_bytes .fetch_add(event.bytes_size, std::sync::atomic::Ordering::Relaxed); + + ProducerStat::send_end(num_records, end_sender, latencies, total_stats).await; }); } + }); + } - // send end - let record_send = total_stats - .record_send + async fn send_end( + num_records: u64, + end_sender: Sender, + latencies: Arc>>, + total_stats: Arc, + ) { + let latency_histogram = latencies.clone(); + + let record_send = total_stats + .record_send + .load(std::sync::atomic::Ordering::Relaxed); + + if record_send >= num_records { + let record_bytes = total_stats + .record_bytes .load(std::sync::atomic::Ordering::Relaxed); - if record_send >= num_records { - let record_bytes = total_stats - .record_bytes - .load(std::sync::atomic::Ordering::Relaxed); - let latency_histogram = latency_histogram.read().await; - let elapsed = total_stats - .first_start_time - .read() - .await - .expect("start time") - .elapsed(); + let latency_histogram = latency_histogram.read().await; + let elapsed = total_stats + .first_start_time + .get() + .expect("start time") + .elapsed(); - let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; - let records_per_sec = (record_send as f64 / elapsed_seconds).round() as u64; - let bytes_per_sec = (record_bytes as f64 / elapsed_seconds).round() as u64; + let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0; + let records_per_sec = (record_send as f64 / elapsed_seconds).round() as u64; + let bytes_per_sec = (record_bytes as f64 / elapsed_seconds).round() as u64; - let mut latencies_histogram = Histogram::::new(3).expect("new histogram"); - for value in latency_histogram.iter() { - latencies_histogram.record(*value).expect("record"); - } + let mut latencies_histogram = Histogram::::new(3).expect("new histogram"); + for value in latency_histogram.iter() { + latencies_histogram.record(*value).expect("record"); + } + + let end = EndProducerStat { + latencies_histogram, + total_records: record_send, + records_per_sec, + bytes_per_sec, + }; - let end = EndProducerStat { - latencies_histogram, - total_records: record_send, - records_per_sec, - bytes_per_sec, - }; - end_sender.send(end).await.expect("send end"); + // check if any producer already sent it + if let Err(e) = end_sender.send(end).await { + trace!("error sending end: {}", e); } - }); + } } } pub(crate) struct StatCollector { - total_stats: Arc, num_records: u64, + total_stats: Arc, end_sender: Sender, latencies_histogram: Arc>>, } @@ -132,19 +144,23 @@ pub(crate) struct StatCollector { impl StatCollector { pub(crate) fn create( num_records: u64, - end_sender: Sender, stats_sender: Sender, + end_sender: Sender, ) -> Self { let latencies = Arc::new(RwLock::new(Vec::with_capacity(num_records as usize))); let total_stats = Arc::new(TotalStats { record_send: AtomicU64::new(0), record_bytes: AtomicU64::new(0), - first_start_time: RwLock::new(None), + first_start_time: OnceLock::new(), }); - Self::send_stats(latencies.clone(), stats_sender.clone(), total_stats.clone()) - .expect("send stats"); + Self::send_stats( + num_records, + latencies.clone(), + stats_sender.clone(), + total_stats.clone(), + ); Self { total_stats, @@ -165,10 +181,11 @@ impl StatCollector { } fn send_stats( + num_records: u64, latencies: Arc>>, stats_sender: Sender, total_stats: Arc, - ) -> Result<(), std::io::Error> { + ) { spawn(async move { loop { let stats_sender = stats_sender.clone(); @@ -176,6 +193,10 @@ impl StatCollector { let old_record_send = total_stats .record_send .load(std::sync::atomic::Ordering::Relaxed); + if old_record_send >= num_records { + break; + } + let old_record_bytes = total_stats .record_bytes .load(std::sync::atomic::Ordering::Relaxed); @@ -183,7 +204,7 @@ impl StatCollector { let old_latencies_len = old_latencies.len(); drop(old_latencies); sleep(Duration::from_secs(1)).await; - let first_start_time = total_stats.first_start_time.read().await; + let first_start_time = total_stats.first_start_time.get(); if first_start_time.is_none() { continue; } @@ -224,7 +245,8 @@ impl StatCollector { let latency_avg = latencies_histogram.mean() as u64; let latency_max = latencies_histogram.value_at_quantile(1.0); - stats_sender + // ignore if channel is closed + if let Err(e) = stats_sender .send(Stats { record_send, record_bytes, @@ -234,9 +256,10 @@ impl StatCollector { latency_max, }) .await - .expect("send stats"); + { + trace!("error sending stats: {}", e); + } } }); - Ok(()) } } diff --git a/crates/fluvio-benchmark/src/utils.rs b/crates/fluvio-benchmark/src/utils.rs index abd14e419c..6e062122ad 100644 --- a/crates/fluvio-benchmark/src/utils.rs +++ b/crates/fluvio-benchmark/src/utils.rs @@ -50,3 +50,33 @@ pub fn generate_random_string_vec(num: usize, size: usize) -> Vec { pub fn nanos_to_ms_pritable(nano: u64) -> f64 { Duration::from_nanos(nano).as_secs_f64() * 1000.0 } + +/// Calculate the number of records each producer should send +pub fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 { + if id == 0 { + num_records / num_producers + num_records % num_producers + } else { + num_records / num_producers + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_num_records_per_producer() { + let num_producers = 3; + let num_records = 10; + + assert_eq!(records_per_producer(0, num_producers, num_records), 4); + assert_eq!(records_per_producer(1, num_producers, num_records), 3); + assert_eq!(records_per_producer(2, num_producers, num_records), 3); + + let num_producers = 3; + let num_records = 12; + assert_eq!(records_per_producer(0, num_producers, num_records), 4); + assert_eq!(records_per_producer(1, num_producers, num_records), 4); + assert_eq!(records_per_producer(2, num_producers, num_records), 4); + } +}