-
-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skip slots with active reading Ref
s
#80
Conversation
…_ref`) to this slot to be dropped. Add ability to `Core::push_ref` to skip such slots and attempt to reuse them on the next lap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I'm definitely in favor of making this change if it avoids a potentially long-running spin loop!
I had some suggestions about the implementation; in particular, I think we might be better off storing the presence of the reader as a bit in the slot's state
field, so the entire state can be read from and written to in a single atomic operation, rather than requiring two reads to synchronize.
I'd really like to see some loom
tests exercising the new behavior, if possible. Also, if you don't mind running the crate's benchmarks before and after making this change, I'd love to see if it results in a substantial performance improvement.
Thanks again!
src/lib.rs
Outdated
where | ||
R: Recycle<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: is this how rustfmt wanted us to format this? let's make sure it's rustfmt-approved before merging, please :)
@@ -101,6 +103,7 @@ struct Core { | |||
struct Slot<T> { | |||
value: UnsafeCell<MaybeUninit<T>>, | |||
state: AtomicUsize, | |||
has_reader: AtomicBool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really necessary to add an entire additional AtomicBool
(a whole word) to store what is essentially one bit of information? would it make sense to store this in the state by setting one bit in the state
field instead? we could use the first bit to indicate if there is a reader, and store the generation of the slot in the remaining 63 bits, accessing the generation by shifting the value by 1.
this would have a couple advantages: one, it would decrease the size of each slot by a word --- the memory usage isn't a huge deal, but it does scale with the size of the buffer, which could be meaningful.
more importantly, though, the current approach stores two separate pieces of shared state in the Slot
type, which are not both accessed atomically. this means there is a potential for racy behavior to occur when one value has been updated and the other has not yet been. if we store the presence of a reader as a single bit in the state
field, both values are always read and written in a single atomic operation.
on the other hand, the approach i'm describing introduces some extra complexity to the code, since the presence of the reader field is not obvious on the struct definition and is instead hidden behind a bitfield...if there isn't a potential for racy behavior here, it may be better to keep this in a separate field.
let state = test_dbg!(slot.state.load(SeqCst)); | ||
// slot is writable | ||
if test_dbg!(state == tail) { | ||
// Move the tail index forward by 1. | ||
let next_tail = self.next(idx, gen); | ||
// try to advance the tail | ||
match test_dbg!(self | ||
.tail | ||
.compare_exchange_weak(tail, next_tail, SeqCst, Acquire)) | ||
{ | ||
Ok(_) => { | ||
// We got the slot! It's now okay to write to it | ||
test_println!("claimed tail slot [{}]", idx); | ||
// Claim exclusive ownership over the slot | ||
let ptr = slot.value.get_mut(); | ||
|
||
// Initialize or recycle the element. | ||
unsafe { | ||
// Safety: we have just claimed exclusive ownership over | ||
// this slot. | ||
let ptr = ptr.deref(); | ||
if gen == 0 { | ||
ptr.write(recycle.new_element()); | ||
test_println!("-> initialized"); | ||
} else { | ||
// Safety: if the generation is > 0, then the | ||
// slot has already been initialized. | ||
recycle.recycle(ptr.assume_init_mut()); | ||
test_println!("-> recycled"); | ||
test_println!("advanced tail {} to {}", tail, next_tail); | ||
test_println!("claimed slot [{}]", idx); | ||
let has_reader = test_dbg!(slot.has_reader.load(SeqCst)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like it was necessary to change these loads from Acquire
to SeqCst
because we need the loads of the state
and has_reader
fields to have a happens-before relationship? if we apply my above suggestion about merging the has-reader bit into the state
variable, we could avoid the need to synchronize between two loads, and we could still perform Acquire
loads here. this might improve performance a bit...
#[test] | ||
fn spsc_skip_slot() { | ||
let (tx, rx) = blocking::channel::<usize>(3); | ||
// 0 lap | ||
tx.send(0).unwrap(); | ||
assert_eq!(rx.recv(), Some(0)); | ||
tx.send(1).unwrap(); | ||
let msg_ref = rx.try_recv_ref().unwrap(); | ||
tx.send(2).unwrap(); | ||
assert_eq!(rx.recv(), Some(2)); | ||
// 1 lap | ||
tx.send(3).unwrap(); | ||
assert_eq!(rx.recv(), Some(3)); | ||
tx.send(4).unwrap(); | ||
assert_eq!(rx.recv(), Some(4)); | ||
drop(msg_ref); | ||
// 2 lap | ||
tx.send(5).unwrap(); | ||
tx.send(6).unwrap(); | ||
tx.send(7).unwrap(); | ||
assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); | ||
assert_eq!(rx.recv(), Some(5)); | ||
assert_eq!(rx.recv(), Some(6)); | ||
assert_eq!(rx.recv(), Some(7)); | ||
} | ||
|
||
#[test] | ||
fn spsc_full_after_skipped() { | ||
let (tx, rx) = blocking::channel::<usize>(3); | ||
// 0 lap | ||
tx.send(0).unwrap(); | ||
assert_eq!(rx.recv(), Some(0)); | ||
tx.send(1).unwrap(); | ||
let _msg_ref = rx.try_recv_ref().unwrap(); | ||
tx.send(2).unwrap(); | ||
// lap 1 | ||
tx.send(3).unwrap(); | ||
assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); | ||
} | ||
|
||
#[test] | ||
fn spsc_empty_after_skipped() { | ||
let (tx, rx) = blocking::channel::<usize>(2); | ||
// 0 lap | ||
tx.send(0).unwrap(); | ||
tx.send(1).unwrap(); | ||
let _msg_ref = rx.try_recv_ref().unwrap(); | ||
assert_eq!(rx.recv(), Some(1)); | ||
assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add tests for this behavior that will run under loom
, as well, or do the existing loom
tests sufficiently exercise it?
if self.is_pop { | ||
test_println!("drop Ref<{}> (pop)", core::any::type_name::<T>()); | ||
test_dbg!(self.slot.has_reader.store(test_dbg!(false), SeqCst)); | ||
} else { | ||
test_println!( | ||
"drop Ref<{}> (push), new_state = {}", | ||
core::any::type_name::<T>(), | ||
self.new_state | ||
); | ||
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we took my suggestion of storing the reader bit in the state
field, the Drop
impl could be simplified, and Ref
could be one word smaller, because we would either store a new_state
that advances the generation, or one that clears the reader bit.
huh, interesting --- running the benchmarks on my machine, there's a substantial performance improvement for the blocking SPSC tests, but the equivalent benchmarks for the async channel have a slightly less substantial but still quite significant performance regression. benchmark results
i wonder why that is! |
Thank you for your comments and suggestions, I will go through them probably this Sunday and ask for another round of review. |
Great, sounds good to me! |
hi, here's an issue: #83 |
## v0.1.5 (2024-04-06) #### Features * **mpsc:** add `len`, `capacity`, and `remaining` methods to mpsc (#72) ([00213c1](00213c1), closes [#71](#71)) #### Bug Fixes * unused import with `alloc` enabled ([ac1eafc](ac1eafc)) * skip slots with active reading `Ref`s in `push_ref` (#81) ([a72a286](a72a286), closes [#83](#83), [#80](#80))
Core::push_ref
can go into an (almost infinite) spin loop waiting for aRef
(created inpop_ref
) to this slot to be dropped. This behaviour can lead to writing being blocked unless all refs are dropped, even if we have free space in the buffer.In this PR I propose a mechanism to skip such slots (by updating their
state
) and attempt to write into them on the next lap.