Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip slots with active reading Refs #80

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 103 additions & 58 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use core::{cmp, fmt, mem::MaybeUninit, ops, ptr};

#[macro_use]
mod macros;

Expand Down Expand Up @@ -36,7 +37,7 @@ feature! {

use crate::{
loom::{
atomic::{AtomicUsize, Ordering::*},
atomic::{AtomicBool, AtomicUsize, Ordering::*},
cell::{MutPtr, UnsafeCell},
},
mpsc::errors::{TryRecvError, TrySendError},
Expand All @@ -62,6 +63,7 @@ pub struct Ref<'slot, T> {
ptr: MutPtr<MaybeUninit<T>>,
slot: &'slot Slot<T>,
new_state: usize,
is_pop: bool,
}

/// Error indicating that a `push` operation failed because a queue was at
Expand Down Expand Up @@ -101,6 +103,7 @@ struct Core {
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicUsize,
has_reader: AtomicBool,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really necessary to add an entire additional AtomicBool (a whole word) to store what is essentially one bit of information? would it make sense to store this in the state by setting one bit in the state field instead? we could use the first bit to indicate if there is a reader, and store the generation of the slot in the remaining 63 bits, accessing the generation by shifting the value by 1.

this would have a couple advantages: one, it would decrease the size of each slot by a word --- the memory usage isn't a huge deal, but it does scale with the size of the buffer, which could be meaningful.

more importantly, though, the current approach stores two separate pieces of shared state in the Slot type, which are not both accessed atomically. this means there is a potential for racy behavior to occur when one value has been updated and the other has not yet been. if we store the presence of a reader as a single bit in the state field, both values are always read and written in a single atomic operation.

on the other hand, the approach i'm describing introduces some extra complexity to the code, since the presence of the reader field is not obvious on the struct definition and is instead hidden behind a bitfield...if there isn't a potential for racy behavior here, it may be better to keep this in a separate field.

}

impl Core {
Expand Down Expand Up @@ -179,13 +182,12 @@ impl Core {
slots: &'slots [Slot<T>],
recycle: &R,
) -> Result<Ref<'slots, T>, TrySendError<()>>
where
R: Recycle<T>,
where
R: Recycle<T>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: is this how rustfmt wanted us to format this? let's make sure it's rustfmt-approved before merging, please :)

{
test_println!("push_ref");
let mut backoff = Backoff::new();
let mut tail = self.tail.load(Relaxed);

let mut tail = test_dbg!(self.tail.load(Relaxed));
loop {
if test_dbg!(tail & self.closed != 0) {
return Err(TrySendError::Closed(()));
Expand All @@ -210,54 +212,63 @@ impl Core {
);
slots.get_unchecked(idx)
};
let state = test_dbg!(slot.state.load(Acquire));

let state = test_dbg!(slot.state.load(SeqCst));
// slot is writable
if test_dbg!(state == tail) {
// Move the tail index forward by 1.
let next_tail = self.next(idx, gen);
// try to advance the tail
match test_dbg!(self
.tail
.compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
{
Ok(_) => {
// We got the slot! It's now okay to write to it
test_println!("claimed tail slot [{}]", idx);
// Claim exclusive ownership over the slot
let ptr = slot.value.get_mut();

// Initialize or recycle the element.
unsafe {
// Safety: we have just claimed exclusive ownership over
// this slot.
let ptr = ptr.deref();
if gen == 0 {
ptr.write(recycle.new_element());
test_println!("-> initialized");
} else {
// Safety: if the generation is > 0, then the
// slot has already been initialized.
recycle.recycle(ptr.assume_init_mut());
test_println!("-> recycled");
test_println!("advanced tail {} to {}", tail, next_tail);
test_println!("claimed slot [{}]", idx);
let has_reader = test_dbg!(slot.has_reader.load(SeqCst));
Comment on lines +215 to +227
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like it was necessary to change these loads from Acquire to SeqCst because we need the loads of the state and has_reader fields to have a happens-before relationship? if we apply my above suggestion about merging the has-reader bit into the state variable, we could avoid the need to synchronize between two loads, and we could still perform Acquire loads here. this might improve performance a bit...

if test_dbg!(!has_reader) {
// We got the slot! It's now okay to write to it
// Claim exclusive ownership over the slot
let ptr = slot.value.get_mut();
// Initialize or recycle the element.
unsafe {
// Safety: we have just claimed exclusive ownership over
// this slot.
let ptr = ptr.deref();
if gen == 0 {
ptr.write(recycle.new_element());
test_println!("-> initialized");
} else {
// Safety: if the generation is > 0, then the
// slot has already been initialized.
recycle.recycle(ptr.assume_init_mut());
test_println!("-> recycled");
}
}
return Ok(Ref {
ptr,
new_state: tail + 1,
slot,
is_pop: false,
});
} else {
test_println!("has an active reader, skipping slot [{}]", idx);
let next_state = tail.wrapping_add(self.gen);
test_dbg!(slot.state.store(test_dbg!(next_state), Release));
backoff.spin();
continue;
}

return Ok(Ref {
ptr,
new_state: tail + 1,
slot,
});
}
Err(actual) => {
// Someone else took this slot and advanced the tail
// index. Try to claim the new tail.
test_println!("failed to advance tail {} to {}", tail, next_tail);
tail = actual;
backoff.spin();
continue;
}
}
}

if test_dbg!(state.wrapping_add(self.gen) == tail + 1) {
} else {
// check if we have any available slots
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
Expand All @@ -270,12 +281,9 @@ impl Core {
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin();
} else {
backoff.spin_yield();
}

backoff.spin_yield();
tail = test_dbg!(self.tail.load(Acquire));
}
}
Expand Down Expand Up @@ -308,43 +316,47 @@ impl Core {
);
slots.get_unchecked(idx)
};

let state = test_dbg!(slot.state.load(Acquire));
let next_head = self.next(idx, gen);

// If the slot's state is ahead of the head index by one, we can pop
// it.
// If the slot's state is ahead of the head index by one, we can pop it.
if test_dbg!(state == head + 1) {
let next_head = self.next(idx, gen);
// try to advance the head index
match test_dbg!(self
.head
.compare_exchange_weak(head, next_head, SeqCst, Acquire))
{
Ok(_) => {
test_println!("claimed head slot [{}]", idx);
test_println!("advanced head {} to {}", head, next_head);
test_println!("claimed slot [{}]", idx);
let new_state = head.wrapping_add(self.gen);
test_dbg!(slot.has_reader.store(true, SeqCst));
test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst));
return Ok(Ref {
new_state: head.wrapping_add(self.gen),
new_state,
ptr: slot.value.get_mut(),
slot,
is_pop: true,
});
}
Err(actual) => {
test_println!("failed to advance head, head={}, actual={}", head, actual);
head = actual;
backoff.spin();
continue;
}
}
}

if test_dbg!(state == head) {
} else {
// Maybe we reached the tail index? If so, the buffer is empty.
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...

let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));

if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(TryRecvError::Closed)
Expand All @@ -354,16 +366,38 @@ impl Core {
};
}

if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
// Is anyone writing to the slot from this generation?
if test_dbg!(state == head) {
if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
}
backoff.spin();
continue;
}

backoff.spin();
} else {
backoff.spin_yield();
// The slot is in invalid state (was skipped). Try to advance the head index.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
match test_dbg!(self
.head
.compare_exchange_weak(head, next_head, SeqCst, Acquire))
{
Ok(_) => {
test_println!("skipped head slot [{}], new head={}", idx, next_head);
backoff.spin();
continue;
}
Err(actual) => {
test_println!(
"failed to skip head slot [{}], head={}, actual={}",
idx,
head,
actual
);
head = actual;
backoff.spin();
continue;
}
}
}

head = test_dbg!(self.head.load(Acquire));
}
}

Expand Down Expand Up @@ -475,8 +509,17 @@ impl<T> ops::DerefMut for Ref<'_, T> {
impl<T> Drop for Ref<'_, T> {
#[inline]
fn drop(&mut self) {
test_println!("drop Ref<{}>", core::any::type_name::<T>());
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
if self.is_pop {
test_println!("drop Ref<{}> (pop)", core::any::type_name::<T>());
test_dbg!(self.slot.has_reader.store(test_dbg!(false), SeqCst));
} else {
test_println!(
"drop Ref<{}> (push), new_state = {}",
core::any::type_name::<T>(),
self.new_state
);
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
Comment on lines +512 to +521
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we took my suggestion of storing the reader bit in the state field, the Drop impl could be simplified, and Ref could be one word smaller, because we would either store a new_state that advances the generation, or one that clears the reader bit.

}
}
}

Expand Down Expand Up @@ -542,6 +585,7 @@ impl<T> Slot<T> {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(idx),
has_reader: AtomicBool::new(false),
}
}

Expand All @@ -550,6 +594,7 @@ impl<T> Slot<T> {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(idx),
has_reader: AtomicBool::new(false),
}
}
}
Expand Down
53 changes: 53 additions & 0 deletions tests/mpsc_blocking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::thread;
use thingbuf::mpsc::blocking;
use thingbuf::mpsc::errors::{TryRecvError, TrySendError};

#[test]
fn basically_works() {
Expand Down Expand Up @@ -70,3 +71,55 @@ fn tx_close_drains_queue() {
producer.join().unwrap();
}
}

#[test]
fn spsc_skip_slot() {
let (tx, rx) = blocking::channel::<usize>(3);
// 0 lap
tx.send(0).unwrap();
assert_eq!(rx.recv(), Some(0));
tx.send(1).unwrap();
let msg_ref = rx.try_recv_ref().unwrap();
tx.send(2).unwrap();
assert_eq!(rx.recv(), Some(2));
// 1 lap
tx.send(3).unwrap();
assert_eq!(rx.recv(), Some(3));
tx.send(4).unwrap();
assert_eq!(rx.recv(), Some(4));
drop(msg_ref);
// 2 lap
tx.send(5).unwrap();
tx.send(6).unwrap();
tx.send(7).unwrap();
assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_))));
assert_eq!(rx.recv(), Some(5));
assert_eq!(rx.recv(), Some(6));
assert_eq!(rx.recv(), Some(7));
}

#[test]
fn spsc_full_after_skipped() {
let (tx, rx) = blocking::channel::<usize>(3);
// 0 lap
tx.send(0).unwrap();
assert_eq!(rx.recv(), Some(0));
tx.send(1).unwrap();
let _msg_ref = rx.try_recv_ref().unwrap();
tx.send(2).unwrap();
// lap 1
tx.send(3).unwrap();
assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_))));
}

#[test]
fn spsc_empty_after_skipped() {
let (tx, rx) = blocking::channel::<usize>(2);
// 0 lap
tx.send(0).unwrap();
tx.send(1).unwrap();
let _msg_ref = rx.try_recv_ref().unwrap();
assert_eq!(rx.recv(), Some(1));
assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty)));
}
Comment on lines +75 to +124
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add tests for this behavior that will run under loom, as well, or do the existing loom tests sufficiently exercise it?