Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: benchmark dead channels #4359

Merged
merged 2 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/fluvio-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ publish = false
anyhow = { workspace = true }
async-channel = { workspace = true }
bytesize = { workspace = true, features = ['serde'] }
chrono = { workspace = true, features = ['serde']}
clap = { workspace = true, features = ["std","derive"] }
derive_builder = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
hdrhistogram = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
tokio = { workspace = true, features = ['macros'] }
rayon = { workspace = true }
tokio = { workspace = true, features = ['sync', 'macros'] }
serde = { workspace = true , features = ['derive'] }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/fluvio-benchmark/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum BenchmarkMode {
}

pub async fn run_benchmarks(opt: BenchmarkOpt) -> Result<()> {
println!("# Fluvio Benchmark Results");

if let Some(mode) = opt.benchmark {
BenchmarkDriver::run_benchmark(mode).await?;
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-benchmark/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytesize::ByteSize;
use crate::utils;

const DEFAULT_BATCH_SIZE: &str = "16kib";
const DEFAULT_QUEUE_SIZE: u64 = 10;
const DEFAULT_QUEUE_SIZE: u64 = 100;
const DEFAULT_MAX_REQUEST_SIZE: &str = "32mib";
const DEFAULT_LINGER: &str = "0ms";
const DEFAULT_SERVER_TIMEOUT: &str = "5000ms";
Expand Down
177 changes: 91 additions & 86 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use std::sync::Arc;

use anyhow::Result;
use async_channel::{unbounded, Receiver};
use async_channel::unbounded;

use bytesize::ByteSize;
use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio_future::{future::timeout, task::spawn, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use tokio::select;
use futures_util::{stream::FuturesUnordered, StreamExt};
use tokio::sync::broadcast;
use tracing::debug;

use crate::{
config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, utils,
config::ProducerConfig,
producer_worker::ProducerWorker,
stats_collector::{EndProducerStat, StatCollector, Stats},
utils,
};

pub struct ProducerBenchmark {}
Expand All @@ -28,7 +35,7 @@ impl ProducerBenchmark {
admin.create(topic_name.clone(), false, new_topic).await?;
}

println!("created topic {}", topic_name);
debug!("created topic {}", topic_name);
let result = ProducerBenchmark::run_samples(config.clone()).await;

sleep(std::time::Duration::from_millis(100)).await;
Expand All @@ -40,108 +47,106 @@ impl ProducerBenchmark {
// Clean up topic
if config.delete_topic {
admin.delete::<TopicSpec>(topic_name.clone()).await?;
print!("Topic deleted successfully {}", topic_name.clone());
debug!("Topic deleted successfully {}", topic_name.clone());
}

Ok(())
}

async fn run_samples(config: ProducerConfig) -> Result<()> {
let mut tx_controls = Vec::new();
let mut workers_jh = Vec::new();

let (stats_sender, stats_receiver) = unbounded();
let (end_sender, end_receiver) = unbounded();
let (end_sender, mut end_receiver) = broadcast::channel(2);
let end_sender = Arc::new(end_sender);
let stat_collector =
StatCollector::create(config.num_records, end_sender.clone(), stats_sender.clone());

// Set up producers
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
println!("starting up producer {}", producer_id);
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), event_sender).await?;
let jh = spawn(timeout(
config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
));
StatCollector::create(config.num_records, stats_sender.clone(), end_sender.clone());

tx_control.send(ControlMessage::SendBatch).await?;
tx_controls.push(tx_control);
workers_jh.push(jh);
}
Self::setup_producers(config.clone(), stat_collector).await;
println!("Benchmark started");
Self::print_progress_on_backgroud(stats_receiver).await;
Self::print_benchmark_on_end(&mut end_receiver).await;
println!("Benchmark completed");

loop {
select! {
stat_rx = stats_receiver.recv() => {
if let Ok(stat) = stat_rx {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
stat.record_send, stat.records_per_sec, human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
);
}
}
end = end_receiver.recv() => {
if let Ok(end) = end {
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency",
utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0))));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {:.2}ms p{percentile:4.2}",
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(percentile)),
));
}
println!();
println!("{}", latency_yaml);

let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string();
println!(
"{} total records sent, {} records/sec: ({}/sec) ",
end.total_records, end.records_per_sec, human_readable_bytes
);
}
break;
}
Ok(())
}

async fn setup_producers(config: ProducerConfig, stat_collector: StatCollector) {
spawn(async move {
let worker_futures = FuturesUnordered::new();
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
let config = config.clone();
let jh = spawn(timeout(config.worker_timeout, async move {
debug!("starting up producer {}", producer_id);
let worker = ProducerWorker::new(producer_id, config, event_sender)
.await
.expect("create producer worker");
ProducerDriver::main_loop(worker).await.expect("main loop");
}));

worker_futures.push(jh);
}
}

// Wait for all producers to finish
for jh in workers_jh {
jh.await??;
}
for worker in worker_futures.collect::<Vec<_>>().await {
worker.expect("producer worker failed");
}
});
}

// Print stats
println!("Benchmark completed");
async fn print_progress_on_backgroud(stats_receiver: async_channel::Receiver<Stats>) {
spawn(async move {
while let Ok(stat) = stats_receiver.recv().await {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {} avg latency, {} max latency",
stat.record_send,
stat.records_per_sec,
human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg),
utils::nanos_to_ms_pritable(stat.latency_max)
);
}
});
}

Ok(())
async fn print_benchmark_on_end(end_receiver: &mut broadcast::Receiver<EndProducerStat>) {
if let Ok(end) = end_receiver.recv().await {
// sleep enough time to make sure all stats are printed
sleep(std::time::Duration::from_secs(1)).await;
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!(
"{} avg latency, {} max latency",
utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0))
));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {} p{percentile:4.2}",
utils::nanos_to_ms_pritable(
end.latencies_histogram.value_at_quantile(percentile)
),
));
}
println!();
println!("{}", latency_yaml);

let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string();
println!(
"{} total records sent, {} records/sec: ({}/sec), total time: {}",
end.total_records,
end.records_per_sec,
human_readable_bytes,
utils::pretty_duration(end.elapsed)
);
}
}
}

struct ProducerDriver;

impl ProducerDriver {
async fn main_loop(rx: Receiver<ControlMessage>, worker: ProducerWorker) -> Result<()> {
//loop {
match rx.recv().await? {
ControlMessage::SendBatch => {
println!("producer send batch");
if let Err(err) = worker.send_batch().await {
println!("producer send batch error: {:#?}", err);
}
}
};
//}
async fn main_loop(worker: ProducerWorker) -> Result<()> {
worker.send_batch().await?;
Ok(())
}
}

#[derive(Clone, Copy, Debug)]
enum ControlMessage {
SendBatch,
}
35 changes: 3 additions & 32 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fluvio::{
TopicProducerConfigBuilder, TopicProducerPool,
};
use futures_util::future::BoxFuture;
use tracing::debug;

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand Down Expand Up @@ -68,7 +69,7 @@ impl ProducerWorker {
.topic_producer_with_config(config.topic_name.clone(), fluvio_config)
.await?;

let num_records = records_per_producer(id, config.num_producers, config.num_records);
let num_records = utils::records_per_producer(id, config.num_producers, config.num_records);

let records_to_send = create_records(config.clone(), num_records, id);

Expand All @@ -79,7 +80,7 @@ impl ProducerWorker {
}

pub async fn send_batch(self) -> Result<()> {
println!("producer is sending batch");
debug!("producer is sending batch");

for record in self.records_to_send.into_iter() {
let _ = self
Expand Down Expand Up @@ -123,33 +124,3 @@ impl BenchmarkRecord {
Self { key, data }
}
}

/// Calculate the number of records each producer should send
fn records_per_producer(id: u64, num_producers: u64, num_records: u64) -> u64 {
if id == 0 {
num_records / num_producers + num_records % num_producers
} else {
num_records / num_producers
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_num_records_per_producer() {
let num_producers = 3;
let num_records = 10;

assert_eq!(records_per_producer(0, num_producers, num_records), 4);
assert_eq!(records_per_producer(1, num_producers, num_records), 3);
assert_eq!(records_per_producer(2, num_producers, num_records), 3);

let num_producers = 3;
let num_records = 12;
assert_eq!(records_per_producer(0, num_producers, num_records), 4);
assert_eq!(records_per_producer(1, num_producers, num_records), 4);
assert_eq!(records_per_producer(2, num_producers, num_records), 4);
}
}
Loading
Loading