Skip to content

Commit

Permalink
fix: benchmark dead channels
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jan 24, 2025
1 parent e02ac38 commit f8522e2
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 144 deletions.
2 changes: 0 additions & 2 deletions crates/fluvio-benchmark/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum BenchmarkMode {
}

pub async fn run_benchmarks(opt: BenchmarkOpt) -> Result<()> {
println!("# Fluvio Benchmark Results");

if let Some(mode) = opt.benchmark {
BenchmarkDriver::run_benchmark(mode).await?;
} else {
Expand Down
76 changes: 32 additions & 44 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use anyhow::Result;
use async_channel::{unbounded, Receiver};
use async_channel::unbounded;

use bytesize::ByteSize;
use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio_future::{future::timeout, io::StreamExt, task::spawn, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use futures_util::stream::FuturesUnordered;
use tokio::select;
use tracing::debug;

use crate::{
config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils,
Expand All @@ -28,7 +30,7 @@ impl ProducerBenchmark {
admin.create(topic_name.clone(), false, new_topic).await?;
}

println!("created topic {}", topic_name);
debug!("created topic {}", topic_name);
let result = ProducerBenchmark::run_samples(config.clone()).await;

sleep(std::time::Duration::from_millis(100)).await;
Expand All @@ -40,37 +42,41 @@ impl ProducerBenchmark {
// Clean up topic
if config.delete_topic {
admin.delete::<TopicSpec>(topic_name.clone()).await?;
print!("Topic deleted successfully {}", topic_name.clone());
debug!("Topic deleted successfully {}", topic_name.clone());
}

Ok(())
}

async fn run_samples(config: ProducerConfig) -> Result<()> {
let mut tx_controls = Vec::new();
let mut workers_jh = Vec::new();

let (stats_sender, stats_receiver) = unbounded();
let (end_sender, end_receiver) = unbounded();
let stat_collector =
StatCollector::create(config.num_records, end_sender.clone(), stats_sender.clone());

// Set up producers
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
println!("starting up producer {}", producer_id);
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), event_sender).await?;
let jh = spawn(timeout(
config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
));

tx_control.send(ControlMessage::SendBatch).await?;
tx_controls.push(tx_control);
workers_jh.push(jh);
}
spawn(async move {
let worker_futures = FuturesUnordered::new();
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
let config = config.clone();
let jh = spawn(timeout(config.worker_timeout, async move {
debug!("starting up producer {}", producer_id);
let worker = ProducerWorker::new(producer_id, config, event_sender)
.await
.expect("create producer worker");
ProducerDriver::main_loop(worker).await.expect("main loop");
}));

worker_futures.push(jh);
}

for worker in worker_futures.collect::<Vec<_>>().await {
worker.expect("producer worker failed");
}
});

println!("Benchmark started");

loop {
Expand Down Expand Up @@ -105,17 +111,13 @@ impl ProducerBenchmark {
"{} total records sent, {} records/sec: ({}/sec) ",
end.total_records, end.records_per_sec, human_readable_bytes
);

break;
}
break;
}
}
}

// Wait for all producers to finish
for jh in workers_jh {
jh.await??;
}

// Print stats
println!("Benchmark completed");

Expand All @@ -126,22 +128,8 @@ impl ProducerBenchmark {
struct ProducerDriver;

impl ProducerDriver {
async fn main_loop(rx: Receiver<ControlMessage>, worker: ProducerWorker) -> Result<()> {
//loop {
match rx.recv().await? {
ControlMessage::SendBatch => {
println!("producer send batch");
if let Err(err) = worker.send_batch().await {
println!("producer send batch error: {:#?}", err);
}
}
};
//}
async fn main_loop(worker: ProducerWorker) -> Result<()> {
worker.send_batch().await?;
Ok(())
}
}

#[derive(Clone, Copy, Debug)]
enum ControlMessage {
SendBatch,
}
35 changes: 3 additions & 32 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fluvio::{
TopicProducerConfigBuilder, TopicProducerPool,
};
use futures_util::future::BoxFuture;
use tracing::debug;

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand Down Expand Up @@ -68,7 +69,7 @@ impl ProducerWorker {
.topic_producer_with_config(config.topic_name.clone(), fluvio_config)
.await?;

let num_records = records_per_producer(id, config.num_producers, config.num_records);
let num_records = utils::records_per_producer(id, config.num_producers, config.num_records);

let records_to_send = create_records(config.clone(), num_records, id);

Expand All @@ -79,7 +80,7 @@ impl ProducerWorker {
}

pub async fn send_batch(self) -> Result<()> {
println!("producer is sending batch");
debug!("producer is sending batch");

for record in self.records_to_send.into_iter() {
let _ = self
Expand Down Expand Up @@ -123,33 +124,3 @@ impl BenchmarkRecord {
Self { key, data }
}
}

/// Calculate the number of records each producer should send
fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 {
if id == 0 {
num_records / num_producers + num_records % num_producers
} else {
num_records / num_producers
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_num_records_per_producer() {
let num_producers = 3;
let num_records = 10;

assert_eq!(records_per_producer(0, num_producers, num_records), 4);
assert_eq!(records_per_producer(1, num_producers, num_records), 3);
assert_eq!(records_per_producer(2, num_producers, num_records), 3);

let num_producers = 3;
let num_records = 12;
assert_eq!(records_per_producer(0, num_producers, num_records), 4);
assert_eq!(records_per_producer(1, num_producers, num_records), 4);
assert_eq!(records_per_producer(2, num_producers, num_records), 4);
}
}
Loading

0 comments on commit f8522e2

Please sign in to comment.