Skip to content

Commit

Permalink
Merge branch 'main' into feature/channel-len
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw authored Apr 6, 2024
2 parents 220eb87 + c839b60 commit 2348a84
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 62 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ jobs:
- mpsc_send_recv_wrap
- mpsc_try_send_recv
- mpsc_try_recv_ref
- mpsc_test_skip_slot
- mpsc_async::rx_close_unconsumed
- mpsc_blocking::rx_close_unconsumed
name: model '${{ matrix.model }}''
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
Cargo.lock
.direnv
177 changes: 130 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use core::{cmp, fmt, mem::MaybeUninit, ops, ptr};

#[macro_use]
mod macros;

Expand Down Expand Up @@ -43,6 +44,13 @@ use crate::{
util::{Backoff, CachePadded},
};

const HAS_READER: usize = 1 << (usize::BITS - 1);

/// Maximum capacity of a `ThingBuf`. This is the largest number of elements that
/// can be stored in a `ThingBuf`. This is the highest power of two that can be expressed by a
/// `usize`, excluding the most significant bit reserved for the "has reader" flag.
pub const MAX_CAPACITY: usize = usize::MAX & !HAS_READER;

/// A reference to an entry in a [`ThingBuf`].
///
/// A `Ref` represents the exclusive permission to mutate a given element in a
Expand All @@ -62,6 +70,7 @@ pub struct Ref<'slot, T> {
ptr: MutPtr<MaybeUninit<T>>,
slot: &'slot Slot<T>,
new_state: usize,
is_pop: bool,
}

/// Error indicating that a `push` operation failed because a queue was at
Expand Down Expand Up @@ -100,12 +109,19 @@ struct Core {

struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
/// Each slot's state has two components: a flag indicated by the most significant bit (MSB), and the rest of the state.
/// The MSB is set when a reader is reading from this slot.
/// The rest of the state helps determine the availability of the slot for reading or writing:
/// - A slot is available for reading when the state (excluding the MSB) equals head + 1.
/// - A slot is available for writing when the state (excluding the MSB) equals tail.
/// At initialization, each slot's state is set to its ordinal index.
state: AtomicUsize,
}

impl Core {
#[cfg(not(all(loom, test)))]
const fn new(capacity: usize) -> Self {
assert!(capacity <= MAX_CAPACITY);
let closed = (capacity + 1).next_power_of_two();
let idx_mask = closed - 1;
let gen = closed << 1;
Expand Down Expand Up @@ -156,7 +172,7 @@ impl Core {
} else {
// We've reached the end of the current lap, wrap the index around
// to 0.
gen.wrapping_add(self.gen)
wrapping_add(gen, self.gen)
}
}

Expand Down Expand Up @@ -184,8 +200,7 @@ impl Core {
{
test_println!("push_ref");
let mut backoff = Backoff::new();
let mut tail = self.tail.load(Relaxed);

let mut tail = test_dbg!(self.tail.load(Relaxed));
loop {
if test_dbg!(tail & self.closed != 0) {
return Err(TrySendError::Closed(()));
Expand All @@ -210,21 +225,43 @@ impl Core {
);
slots.get_unchecked(idx)
};
let state = test_dbg!(slot.state.load(Acquire));

let raw_state = test_dbg!(slot.state.load(SeqCst));
let state = test_dbg!(clear_has_reader(raw_state));
// slot is writable
if test_dbg!(state == tail) {
// Move the tail index forward by 1.
let next_tail = self.next(idx, gen);
// try to advance the tail
match test_dbg!(self
.tail
.compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
{
Ok(_) if test_dbg!(check_has_reader(raw_state)) => {
test_println!(
"advanced tail {} to {}; has an active reader, skipping slot [{}]",
tail,
next_tail,
idx
);
let next_state = wrapping_add(tail, self.gen);
test_dbg!(slot
.state
.fetch_update(SeqCst, SeqCst, |state| {
Some(state & HAS_READER | next_state)
})
.unwrap_or_else(|_| unreachable!()));
backoff.spin();
continue;
}
Ok(_) => {
// We got the slot! It's now okay to write to it
test_println!("claimed tail slot [{}]", idx);
test_println!(
"advanced tail {} to {}; claimed slot [{}]",
tail,
next_tail,
idx
);
// Claim exclusive ownership over the slot
let ptr = slot.value.get_mut();

// Initialize or recycle the element.
unsafe {
// Safety: we have just claimed exclusive ownership over
Expand All @@ -240,42 +277,39 @@ impl Core {
test_println!("-> recycled");
}
}

return Ok(Ref {
ptr,
new_state: tail + 1,
slot,
is_pop: false,
});
}
Err(actual) => {
// Someone else took this slot and advanced the tail
// index. Try to claim the new tail.
test_println!("failed to advance tail {} to {}", tail, next_tail);
tail = actual;
backoff.spin();
continue;
}
}
}

if test_dbg!(state.wrapping_add(self.gen) == tail + 1) {
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(head.wrapping_add(self.gen) == tail) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin();
} else {
backoff.spin_yield();
// check if we have any available slots
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(wrapping_add(head, self.gen) == tail) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin_yield();
tail = test_dbg!(self.tail.load(Acquire));
}
}
Expand Down Expand Up @@ -308,43 +342,47 @@ impl Core {
);
slots.get_unchecked(idx)
};
let state = test_dbg!(slot.state.load(Acquire));

// If the slot's state is ahead of the head index by one, we can pop
// it.
if test_dbg!(state == head + 1) {
let next_head = self.next(idx, gen);
let raw_state = test_dbg!(slot.state.load(Acquire));
let next_head = self.next(idx, gen);

// If the slot's state is ahead of the head index by one, we can pop it.
if test_dbg!(raw_state == head + 1) {
// try to advance the head index
match test_dbg!(self
.head
.compare_exchange_weak(head, next_head, SeqCst, Acquire))
{
Ok(_) => {
test_println!("claimed head slot [{}]", idx);
test_println!("advanced head {} to {}", head, next_head);
test_println!("claimed slot [{}]", idx);
let mut new_state = wrapping_add(head, self.gen);
new_state = set_has_reader(new_state);
test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst));
return Ok(Ref {
new_state: head.wrapping_add(self.gen),
new_state,
ptr: slot.value.get_mut(),
slot,
is_pop: true,
});
}
Err(actual) => {
test_println!("failed to advance head, head={}, actual={}", head, actual);
head = actual;
backoff.spin();
continue;
}
}
}

if test_dbg!(state == head) {
} else {
// Maybe we reached the tail index? If so, the buffer is empty.
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...

let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));

if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(TryRecvError::Closed)
Expand All @@ -354,16 +392,32 @@ impl Core {
};
}

if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
// Is anyone writing to the slot from this generation?
if test_dbg!(raw_state == head) {
if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
}
backoff.spin();
continue;
}

backoff.spin();
} else {
backoff.spin_yield();
// The slot is in an invalid state (was skipped). Try to advance the head index.
match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) {
Ok(_) => {
test_println!("skipped head slot [{}], new head={}", idx, next_head);
}
Err(actual) => {
test_println!(
"failed to skip head slot [{}], head={}, actual={}",
idx,
head,
actual
);
head = actual;
backoff.spin();
}
}
}

head = test_dbg!(self.head.load(Acquire));
}
}

Expand Down Expand Up @@ -409,6 +463,26 @@ impl Core {
}
}

#[inline]
fn check_has_reader(state: usize) -> bool {
state & HAS_READER == HAS_READER
}

#[inline]
fn set_has_reader(state: usize) -> usize {
state | HAS_READER
}

#[inline]
fn clear_has_reader(state: usize) -> usize {
state & !HAS_READER
}

#[inline]
fn wrapping_add(a: usize, b: usize) -> usize {
(a + b) & MAX_CAPACITY
}

impl Drop for Core {
fn drop(&mut self) {
debug_assert!(
Expand Down Expand Up @@ -475,8 +549,17 @@ impl<T> ops::DerefMut for Ref<'_, T> {
impl<T> Drop for Ref<'_, T> {
#[inline]
fn drop(&mut self) {
test_println!("drop Ref<{}>", core::any::type_name::<T>());
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
if self.is_pop {
test_println!("drop Ref<{}> (pop)", core::any::type_name::<T>());
test_dbg!(self.slot.state.fetch_and(!HAS_READER, SeqCst));
} else {
test_println!(
"drop Ref<{}> (push), new_state = {}",
core::any::type_name::<T>(),
self.new_state
);
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ mod inner {
mod inner {
#![allow(dead_code)]
pub(crate) mod sync {
#[allow(unused_imports)]
pub use core::sync::*;

#[cfg(feature = "alloc")]
Expand Down
9 changes: 8 additions & 1 deletion src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use errors::*;
feature! {
#![feature = "alloc"]

use crate::loom::sync::Arc;
use crate::{MAX_CAPACITY, loom::sync::Arc};
use alloc::boxed::Box;

/// Returns a new asynchronous multi-producer, single consumer (MPSC)
Expand All @@ -32,10 +32,17 @@ feature! {
/// Returns a new asynchronous multi-producer, single consumer (MPSC)
/// channel with the provided capacity and [recycling policy].
///
/// # Panics
///
/// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value
/// represents the highest power of two that can be expressed by a `usize`, excluding the most
/// significant bit.
///
/// [recycling policy]: crate::recycling::Recycle
#[must_use]
pub fn with_recycle<T, R: Recycle<T>>(capacity: usize, recycle: R) -> (Sender<T, R>, Receiver<T, R>) {
assert!(capacity > 0);
assert!(capacity <= MAX_CAPACITY);
let inner = Arc::new(Inner {
core: ChannelCore::new(capacity),
slots: Slot::make_boxed_array(capacity),
Expand Down
Loading

0 comments on commit 2348a84

Please sign in to comment.