-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
In MPSC setup, crossbeam is more performant #7
Comments
Hi Venkat,
Interesting - thanks for writing the tests.
There are so many aspects that can impact the measurements so I'll get some
coffee and read your code + try it on my Intel MacBook. 😀
I never got around to write benchmarks for the MPSC case - your code
suggests that it would be a good idea - if there are performance
regressions in the multi producer version.
I'll revert.
Kind regards
Nicholas
lør. 4. maj 2024 11.59 skrev Venkat Raman ***@***.***>:
… Hi @nicholassm <https://github.com/nicholassm>,
Thanks for your awesome work with disruptor rust port. It was very easy to
get started. With a simple 2P1C setup, I noticed that crossbeam is more
performant than this library. Am I missing something ?
*Results:*
On my 13-inch, M1, 2020, 16 GB MacBook Pro:
- disruptor takes, ~230ms in SPSC & ~700ms in MPSC
- crossbeam takes, ~270ms in SPSC & ~550ms in MPSC
$ make run-disruptor-demo-optimized
cargo run --release --bin disruptor_demo
Finished release [optimized] target(s) in 0.01s
Running `target/release/disruptor_demo`
SPSC Sum: 10000000, processed time: 238
MPSC Sum: 20000000, processed time: 709
$ make run-crossbeam-demo-optimized
cargo run --release --bin crossbeam_demo
Finished release [optimized] target(s) in 0.00s
Running `target/release/crossbeam_demo`
SPSC Sum: 10000000, processed time: 269
MPSC Sum: 20000000, processed time: 545
*Test Script:*
disruptor_demo.rs
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use std::time::Instant;
use disruptor::{build_multi_producer, build_single_producer, BusySpin, Producer};
struct Event {
val: i32
}
//spsc
fn spsc_example() {
let buf_size = 32_768;
let producer_msg_no = 10_000_000;
let factory = || { Event { val: 0 }}; //to initialize disruptor
let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
// Consumer
let processor = {
let sink = Arc::clone(&sink);
move |event: &Event, _sequence: i64, _end_of_batch: bool| {
sink.fetch_add(event.val, Ordering::SeqCst);
}
};
let mut producer = build_single_producer(buf_size, factory, BusySpin)
.handle_events_with(
processor
)
.build();
let start_time = Instant::now();
// Publish into the Disruptor.
thread::scope(|s| {
s.spawn(move || {
for _ in 0..producer_msg_no {
producer.publish(|e| {
e.val = 1 as i32;
});
}
});
});
let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();
let sum = sink.load(Ordering::SeqCst);
println!("SPSC Sum: {}, processed time: {}", sum, delta);
}
//mpsc
fn mpsc_example() {
let buf_size = 32_768;
let producer_msg_no = 10_000_000;
let factory = || { Event { val: 0 }}; //to initialize disruptor
let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
// Consumer
let processor = {
let sink = Arc::clone(&sink);
move |event: &Event, _sequence: i64, _end_of_batch: bool| {
sink.fetch_add(event.val, Ordering::SeqCst);
}
};
let mut producer1 = build_multi_producer(buf_size, factory, BusySpin)
.handle_events_with(
processor
)
.build();
let mut producer2 = producer1.clone();
let start_time = Instant::now();
// Publish into the Disruptor.
thread::scope(|s| {
s.spawn(move || {
for _ in 0..producer_msg_no {
producer1.publish(|e| {
e.val = 1 as i32;
});
}
});
s.spawn(move || {
for _ in 0..producer_msg_no {
producer2.publish(|e| {
e.val = 1 as i32;
});
}
});
});
let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();
let sum = sink.load(Ordering::SeqCst);
println!("MPSC Sum: {}, processed time: {}", sum, delta);
}
fn main() {
spsc_example();
mpsc_example();
}
crossbeam_demo.rs
use crossbeam::channel::*;
use std::thread::{self, JoinHandle};
use std::time::Instant;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
fn spsc_example() {
let buf_size = 32_768;
let producer_msg_no = 10_000_000;
let (s, r) = bounded(buf_size);
let start_time = Instant::now();
// Producer
let t1 = thread::spawn(move || {
for _ in 0..producer_msg_no {
s.send(1).unwrap();
}
});
let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
let sink_clone = Arc::clone(&sink);
// Consumer
let c1: JoinHandle<()> = thread::spawn(move || {
for msg in r {
let tmp = msg;
sink_clone.fetch_add(tmp, Ordering::SeqCst);
}
});
let _ = t1.join();
let _ = c1.join();
let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();
let sum = sink.load(Ordering::SeqCst);
println!("SPSC Sum: {}, processed time: {}", sum, delta);
}
fn mpsc_example() {
let buf_size = 32_768;
let producer_msg_no = 10_000_000;
let (s, r) = bounded(buf_size);
let s2 = s.clone();
let start_time = Instant::now();
// Producer 1
let t1 = thread::spawn(move || {
for _ in 0..producer_msg_no {
s.send(1).unwrap();
}
});
// Producer 2
let t2 = thread::spawn(move || {
for _ in 0..producer_msg_no {
s2.send(1).unwrap();
}
});
let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
let sink_clone = Arc::clone(&sink);
// Consumer
let c1: JoinHandle<()> = thread::spawn(move || {
for msg in r {
let tmp = msg;
sink_clone.fetch_add(tmp, Ordering::SeqCst);
}
});
let _ = t1.join();
let _ = t2.join();
let _ = c1.join();
let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();
let sum = sink.load(Ordering::SeqCst);
println!("MPSC Sum: {}, processed time: {}", sum, delta);
}
fn main() {
spsc_example();
mpsc_example();
}
Repo: https://github.com/Venkat2811/producer-consumer-rust
—
Reply to this email directly, view it on GitHub
<#7>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABJ7DZS5WGQ563UG2CBI7DZASWQZAVCNFSM6AAAAABHGWLYYWVHI2DSMVQWIX3LMV43ASLTON2WKOZSGI3TQOJQGEZDIMY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Hi @Venkat2811, I've read your code and it looks good: A very precise comparison of the two libraries. I ran your examples on my Quad-Core Intel Core i7 and these are the results (only minor variance over several runs):
As you can see, the Disruptor is much faster in the 2P1C setup. I don't know much about the M1 processor other than it's absolutely awesome for laptops due to everything being integrated on a single die and having circuits optimized for user applications. Therefore, I don't know how well it will perform for low latency applications that are not for "the human eye". Kind regards, |
Hi Nicholas, Thank you for taking the time to test the benchmark code. Credit goes to this post. Hope your coffee tasted much better while benchmarking this :) I'm new to rust, ChatGPT gave me 2 commands to run the rust bin.
When I use the former, I get same results as you shared above on my M1 as well. When using the latter command, crossbeam is still performant. Could you confirm which one did you use ? I ran benchmarks on my ubuntu workstation (AMD) as well, got similar perf ratio as M1 mac. My workstation's HW specs: $ sudo lshw -class processor -class memory
*-firmware
description: BIOS
vendor: American Megatrends International, LLC.
physical id: 0
version: 2.30
date: 03/06/2023
size: 64KiB
capacity: 32MiB
capabilities: pci upgrade shadowing cdboot bootselect socketedrom edd int13floppynec int13floppytoshiba int13floppy360 int13floppy1200 int13floppy720 int13floppy2880 int5printscreen int9keyboard int14serial int17printer int10video usb biosbootspecification uefi
*-memory
description: System Memory
physical id: 11
slot: System board or motherboard
size: 64GiB
*-bank:0
description: 3600 MHz (0,3 ns) [empty]
product: Unknown
vendor: Unknown
physical id: 0
serial: Unknown
slot: DIMM 0
clock: 3600MHz (0.3ns)
*-bank:1
description: DIMM DDR4 Synchronous Unbuffered (Unregistered) 3600 MHz (0,3 ns)
product: F4-3600C18-32GTZN
vendor: Unknown
physical id: 1
serial: 00000000
slot: DIMM 1
size: 32GiB
width: 64 bits
clock: 3600MHz (0.3ns)
*-bank:2
description: 3600 MHz (0,3 ns) [empty]
product: Unknown
vendor: Unknown
physical id: 2
serial: Unknown
slot: DIMM 0
clock: 3600MHz (0.3ns)
*-bank:3
description: DIMM DDR4 Synchronous Unbuffered (Unregistered) 3600 MHz (0,3 ns)
product: F4-3600C18-32GTZN
vendor: Unknown
physical id: 3
serial: 00000000
slot: DIMM 1
size: 32GiB
width: 64 bits
clock: 3600MHz (0.3ns)
*-cache:0
description: L1 cache
physical id: 14
slot: L1 - Cache
size: 512KiB
capacity: 512KiB
clock: 1GHz (1.0ns)
capabilities: pipeline-burst internal write-back unified
configuration: level=1
*-cache:1
description: L2 cache
physical id: 15
slot: L2 - Cache
size: 4MiB
capacity: 4MiB
clock: 1GHz (1.0ns)
capabilities: pipeline-burst internal write-back unified
configuration: level=2
*-cache:2
description: L3 cache
physical id: 16
slot: L3 - Cache
size: 32MiB
capacity: 32MiB
clock: 1GHz (1.0ns)
capabilities: pipeline-burst internal write-back unified
configuration: level=3
*-cpu
description: CPU
product: AMD Ryzen 7 5800X 8-Core Processor
vendor: Advanced Micro Devices [AMD]
physical id: 17
bus info: cpu@0
version: 25.33.2
serial: Unknown
slot: AM4
size: 2200MHz
capacity: 4850MHz
width: 64 bits
clock: 100MHz
capabilities: lm fpu fpu_exception wp vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp x86-64 constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid aperfmperf rapl pni pclmulqdq monitor ssse3 fma cx16 sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3dnowprefetch osvw ibs skinit wdt tce topoext perfctr_core perfctr_nb bpext perfctr_llc mwaitx cpb cat_l3 cdp_l3 hw_pstate ssbd mba ibrs ibpb stibp vmmcall fsgsbase bmi1 avx2 smep bmi2 erms invpcid cqm rdt_a rdseed adx smap clflushopt clwb sha_ni xsaveopt xsavec xgetbv1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local clzero irperf xsaveerptr rdpru wbnoinvd arat npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold avic v_vmsave_vmload vgif v_spec_ctrl umip pku ospke vaes vpclmulqdq rdpid overflow_recov succor smca fsrm cpufreq
configuration: cores=8 enabledcores=8 microcode=169873930 threads=16 Yes, I also plan to pin threads to core and do some benchmarks on my workstation. Happy to share the results once done. Thanks, |
Ah, you are also running release. Sorry I just noticed. So, looks like the difference comes to processor arch then. |
Yes, the coffee was great. :-) Indeed, I also ran the optimized version (using your Makefile). I see your desktop sports an AMD processor. I know AMD processors are also great but I only have low latency experience with Intel so again I'm uncertain what would make Crossbeam run faster on your desktop. So I think it just underlines the need for proper testing on a CPU typically used in low latency setting. :-) Kind regards, |
I agree. Thanks again ! |
No thank you for the example/benchmark. As a closing remark and as a Rustecean, I must say that it is superp that a general purpose library such as Crossbeam, has so excellent performance that it outperforms the highly specialized Disruptor on (here) two very different processor architectures. |
Yes, I am pleasantly surprised with Do you think the examples I shared would be useful if added to this repo ? Under |
Yes absolutely. But I think it should be in the form of a benchmark to
provide the most value. (See the existing SPSC benchmark for inspiration).
The Criterion crate used here for benchmarks is great: it does the warmup,
statistics, outlier detection, etc. for you. If you don't know it, I think
you would love it.
lør. 4. maj 2024 20.45 skrev Venkat Raman ***@***.***>:
… Yes, I am pleasantly surprised with crossbeam as well. And also,
optimizations done in disruptor ~12 years ago, is still very relevant and
is industry leading :)
Do you think the examples I shared would be useful if added to this repo ?
Under examples dir maybe ? Let me know, happy to raise a PR.
—
Reply to this email directly, view it on GitHub
<#7 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABJ7D7C3JAPX2VARWMUSKTZAUUD3AVCNFSM6AAAAABHGWLYYWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOJUGM2DIMJQGU>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Sure, happy to submit it as a benchmark ! |
Oh, a late insight: Why do you use |
I was just looking for closest to Java's On M1:
There is improvement in SPSC. Earlier with
|
After reading the benchmark you submitted, I actually realized that it is measuring throughput and not latency. So, as we've established, Crossbeam in a MPSC setting can have better throughput on some architectures and worse on others. But what about latency - in particular when events happen after some pause? |
From this project's README:
I was not aware of this. Sure, would be happy to run your updated benchmarks. I also wrote script for SPSC: Benchmarking crossbeam_spsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 13.1s, or reduce sample count to 30.
std_spsc time: [122.41 ms 124.10 ms 125.63 ms]
Found 15 outliers among 100 measurements (15.00%)
15 (15.00%) low mild
Benchmarking crossbeam_spsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 28.2s, or reduce sample count to 10.
crossbeam_spsc time: [263.89 ms 271.59 ms 279.42 ms]
Found 10 outliers among 100 measurements (10.00%)
7 (7.00%) low mild
3 (3.00%) high mild
Benchmarking crossbeam_spsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 36.4s, or reduce sample count to 10.
disruptor_spsc time: [236.87 ms 263.13 ms 289.95 ms] MPSC: Benchmarking std_mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 28.5s, or reduce sample count to 10.
std_mpsc time: [278.84 ms 283.50 ms 288.88 ms]
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high severe
Benchmarking crossbeam_mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 54.7s, or reduce sample count to 10.
crossbeam_mpsc time: [507.17 ms 519.58 ms 532.98 ms]
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) low mild
1 (1.00%) high mild
2 (2.00%) high severe
Benchmarking disruptor_mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 68.0s, or reduce sample count to 10.
disruptor_mpsc time: [704.01 ms 718.82 ms 735.05 ms]
Found 5 outliers among 100 measurements (5.00%)
1 (1.00%) low mild
3 (3.00%) high mild
1 (1.00%) high severe I'm also making changes to use For these libraries, I think there should be running benchmark measuring performance against each new version across different processor arch. I see |
Yeah this might be the reason. |
Hi Venkat, Thanks for the interesting results regarding I just pushed a commit with minor changes of the Kind regards, |
Hi Venkat, I've done a first stab at an
Kind regards, |
Hello Nicholas, With burst & pauses, disruptor is faster in
Thanks, |
Hi Venkat, Excellent - the world makes sense again because I was expecting the I think the reason the Kind regards, |
Hi Venkat, Thanks for all the ping-pong. I'm closing this issue but feel free to open a new if you have new findings. Kind regards, |
Hi Nicholas, Yes, these are the reasons for Disruptor to be faster. But in Yes, thanks to you too. It has been interesting for me too & an excellent project for me to get started with Rust :) Thanks, |
I think it actually makes sense that the mpsc benchmark has lower latency
on M1 than in the counters bench.
The reason is that in the counters bench, the consumer thread is processing
many events without interruption in each measurement. In the mpsc bench,
there are many iterations done for each measurement (done by criterion) and
thus many pauses for the consumer thread in crossbeam. But not to the
thread in the disruptor because it busy-spins.
Kind regards
Nicholas
ons. 15. maj 2024 00.27 skrev Venkat Raman ***@***.***>:
… I think the reason the disruptor has lower latency in the burst: 1,
pause: 0 ms case is that, first and foremost, the crossbeam channel does
the synchronization in a different way. Secondly, due to its "channel
nature", crossbeam also has to check if the channel is disconnected, etc.
Hi Nicholas,
Yes, these are the reasons for Disruptor to be faster. But in bench/
counters.rs' mpsc, disruptor was slower on M1. I think it's the same as burst:
1, pause: 0 ms in your benchmark where disruptor is faster. I'm not sure
what's the cause of this discrepancy.
Yes, thanks to you too. It has been interesting for me too & an excellent
project for me to get started with Rust :)
Thanks,
Venkat
—
Reply to this email directly, view it on GitHub
<#7 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABJ7D4KCK37TRPPVPHZQZTZCKFUJAVCNFSM6AAAAABHGWLYYWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMJRGI2DANZUGU>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
Hi @nicholassm,
Thanks for your awesome work with disruptor rust port. It was very easy to get started. With a simple 2P1C setup, I noticed that crossbeam is more performant than this library. Am I missing something ?
Results:
On my
13-inch, M1, 2020, 16 GB MacBook Pro
:Test Script:
disruptor_demo.rs
crossbeam_demo.rs
Repo: https://github.com/Venkat2811/producer-consumer-rust
The text was updated successfully, but these errors were encountered: