Skip to content

Commit

Permalink
feat: add latency table on fluvio-benchmark (#4333)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Jan 18, 2025
1 parent d8cd442 commit 5e6a65b
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 40 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/fluvio-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ clap = { workspace = true, features = ["std","derive"] }
derive_builder = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
hdrhistogram = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
tokio = { workspace = true, features = ['macros'] }
rayon = { workspace = true }
serde = { workspace = true , features = ['derive'] }
serde_yaml = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-benchmark/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use bytesize::ByteSize;

use crate::utils;

const DEFAULT_BATCH_SIZE: ByteSize = ByteSize(16_384);
const DEFAULT_BATCH_SIZE: ByteSize = ByteSize::kib(16);
const DEFAULT_QUEUE_SIZE: u64 = 10;
const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize(33_554_432);
const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize::mib(32);
const DEFAULT_LINGER: &str = "0ms";
const DEFAULT_SERVER_TIMEOUT: &str = "5000ms";
const DEFAULT_COMPRESSION: Compression = Compression::None;
Expand All @@ -19,7 +19,7 @@ const DEFAULT_WORKER_TIMEOUT: &str = "3000s";
const DEFAULT_RECORD_KEY_ALLOCATION_STRATEGY: RecordKeyAllocationStrategy =
RecordKeyAllocationStrategy::NoKey;
const DEFAULT_NUM_PRODUCERS: u64 = 1;
const DEFAULT_RECORD_SIZE: ByteSize = ByteSize(5120);
const DEFAULT_RECORD_SIZE: ByteSize = ByteSize::kib(5);
const DEFAULT_NUM_RECORDS: u64 = 10_000;
const DEFAULT_PARTITIONS: u32 = 1;
const DEFAULT_REPLICAS: u32 = 1;
Expand Down
62 changes: 47 additions & 15 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use anyhow::Result;
use async_channel::{unbounded, Receiver};

use bytesize::ByteSize;
use fluvio_future::{task::spawn, future::timeout, timer::sleep};
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin};
use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector};
use tokio::select;

use crate::{
utils, config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector,
};

pub struct ProducerBenchmark {}

Expand Down Expand Up @@ -49,10 +54,16 @@ impl ProducerBenchmark {
let mut workers_jh = Vec::new();

let (stat_sender, stat_receiver) = unbounded();
let (latency_sender, latency_receiver) = unbounded();
// Set up producers
for producer_id in 0..config.shared_config.load_config.num_producers {
println!("starting up producer {}", producer_id);
let stat_collector = StatCollector::create(10000, stat_sender.clone());
let stat_collector = StatCollector::create(
config.batch_size.as_u64(),
config.shared_config.load_config.num_records,
latency_sender.clone(),
stat_sender.clone(),
);
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?;
let jh = spawn(timeout(
Expand All @@ -64,20 +75,41 @@ impl ProducerBenchmark {
tx_controls.push(tx_control);
workers_jh.push(jh);
}
println!("benchmark started");

// delay 1 seconds, so produce can start
sleep(std::time::Duration::from_secs(1)).await;

while let Ok(stat) = stat_receiver.recv().await {
if stat.end {
break;
println!("Benchmark started");

loop {
select! {
hist = latency_receiver.recv() => {
if let Ok(hist) = hist {
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency",
utils::nanos_to_ms_pritable(hist.mean() as u64),
utils::nanos_to_ms_pritable(hist.value_at_quantile(1.0))));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {:.2}ms p{percentile:4.2}",
utils::nanos_to_ms_pritable(hist.value_at_quantile(percentile)),
));
}
println!("{}", latency_yaml);
}
break;
}
stat_rx = stat_receiver.recv() => {
if let Ok(stat) = stat_rx {
// lantecy_receiver is finishing the benchmark now
//if stat.end {
// break;
//}
let human_readable_bytes = ByteSize(stat.bytes_per_sec as u64).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
stat.total_records_send, stat.records_per_sec, human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
);
}
}
}
let human_readable_bytes = format!("{:9.1}mb/s", stat.bytes_per_sec / 1000000.0);
println!(
"total bytes send: {} | total message send: {} | message: per second: {}, bytes per sec: {}, ",
stat.total_bytes_send, stat.total_message_send, stat.message_per_sec,human_readable_bytes
);
}

// Wait for all producers to finish
Expand Down
10 changes: 6 additions & 4 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,17 @@ impl ProducerWorker {

for record in self.records_to_send.into_iter() {
self.stat.start();
self.fluvio_producer
let time = std::time::Instant::now();
let send_out = self
.fluvio_producer
.send(record.key, record.data.clone())
.await?;
self.stat.record_record_send(record.data.len() as u64).await;
}

self.stat.send_out((send_out, time));
self.stat.add_record(record.data.len() as u64).await;
}
self.fluvio_producer.flush().await?;
self.stat.finish();
println!("producer is done sending batch");

Ok(())
}
Expand Down
95 changes: 77 additions & 18 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,96 @@
use std::time::Instant;
use std::{sync::Arc, time::Instant};

use async_channel::Sender;
use fluvio::ProduceOutput;
use fluvio_future::{sync::Mutex, task::spawn};
use hdrhistogram::Histogram;

#[derive(Debug)]
pub(crate) struct ProducerStat {
record_send: u64,
record_bytes: u64,
start_time: Instant,
output_tx: Sender<(ProduceOutput, Instant)>,
histogram: Arc<Mutex<Histogram<u64>>>,
}

impl ProducerStat {
pub(crate) fn new() -> Self {
pub(crate) fn new(num_records: u64, latency_sender: Sender<Histogram<u64>>) -> Self {
let (output_tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>();
let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::<u64>::new(2).unwrap()));

ProducerStat::track_latency(num_records, latency_sender, rx, histogram.clone());

Self {
record_send: 0,
record_bytes: 0,
start_time: Instant::now(),
output_tx,
histogram,
}
}

fn track_latency(
num_records: u64,
latency_sender: Sender<Histogram<u64>>,
rx: async_channel::Receiver<(ProduceOutput, Instant)>,
histogram: Arc<Mutex<Histogram<u64>>>,
) {
spawn(async move {
while let Ok((send_out, time)) = rx.recv().await {
let hist = histogram.clone();
let latency_sender = latency_sender.clone();
spawn(async move {
let _o = send_out.wait().await.unwrap();
let duration = time.elapsed();
let mut hist = hist.lock().await;
hist.record(duration.as_nanos() as u64).expect("record");

if hist.len() >= num_records {
latency_sender.send(hist.clone()).await.expect("send");
}
});
}
});
}

pub(crate) fn calcuate(&mut self) -> Stat {
let elapse = self.start_time.elapsed().as_millis();
let message_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round();
let records_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round();
let bytes_per_sec = (self.record_bytes as f64 / elapse as f64) * 1000.0;

let hist = self.histogram.lock_blocking();
let latency_avg = hist.mean() as u64;
let latency_max = hist.value_at_quantile(1.0);

Stat {
message_per_sec,
records_per_sec,
bytes_per_sec,
total_bytes_send: self.record_bytes,
total_message_send: self.record_send,
end: false,
_total_bytes_send: self.record_bytes,
total_records_send: self.record_send,
latency_avg,
latency_max,
_end: false,
}
}

pub(crate) fn set_current_time(&mut self) {
self.start_time = Instant::now();
}

pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) {
self.output_tx.try_send(out).expect("send out");
}
}

pub(crate) struct Stat {
pub message_per_sec: f64,
pub records_per_sec: f64,
pub bytes_per_sec: f64,
pub total_bytes_send: u64,
pub total_message_send: u64,
pub end: bool,
pub _total_bytes_send: u64,
pub total_records_send: u64,
pub latency_avg: u64,
pub latency_max: u64,
pub _end: bool,
}

pub(crate) struct StatCollector {
Expand All @@ -53,9 +101,14 @@ pub(crate) struct StatCollector {
}

impl StatCollector {
pub(crate) fn create(batch_size: u64, sender: Sender<Stat>) -> Self {
pub(crate) fn create(
batch_size: u64,
num_records: u64,
latency_sender: Sender<Histogram<u64>>,
sender: Sender<Stat>,
) -> Self {
Self {
current: ProducerStat::new(),
current: ProducerStat::new(num_records, latency_sender),
batch_size,
current_record: 0,
sender,
Expand All @@ -68,7 +121,11 @@ impl StatCollector {
}
}

pub(crate) async fn record_record_send(&mut self, bytes: u64) {
pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) {
self.current.send_out(out);
}

pub(crate) async fn add_record(&mut self, bytes: u64) {
self.current.record_send += 1;
self.current.record_bytes += bytes;
self.current_record += 1;
Expand All @@ -84,11 +141,13 @@ impl StatCollector {

pub(crate) fn finish(&mut self) {
let end_record = Stat {
message_per_sec: 0.0,
records_per_sec: 0.0,
bytes_per_sec: 0.0,
total_bytes_send: 0,
total_message_send: 0,
end: true,
_total_bytes_send: 0,
total_records_send: 0,
latency_avg: 0,
latency_max: 0,
_end: true,
};

self.sender.try_send(end_record).expect("send end stats");
Expand Down
6 changes: 6 additions & 0 deletions crates/fluvio-benchmark/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use rand::{distributions::Alphanumeric, Rng};
use rand::{RngCore, SeedableRng};
use rand_xoshiro::Xoshiro256PlusPlus;
Expand Down Expand Up @@ -44,3 +46,7 @@ pub fn generate_random_string_vec(num: usize, size: usize) -> Vec<String> {

random_strings
}

pub fn nanos_to_ms_pritable(nano: u64) -> f64 {
Duration::from_nanos(nano).as_secs_f64() * 1000.0
}

0 comments on commit 5e6a65b

Please sign in to comment.