diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e983a1a..b7e860d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -170,6 +170,7 @@ jobs: - mpsc_send_recv_wrap - mpsc_try_send_recv - mpsc_try_recv_ref + - mpsc_test_skip_slot - mpsc_async::rx_close_unconsumed - mpsc_blocking::rx_close_unconsumed name: model '${{ matrix.model }}'' diff --git a/.gitignore b/.gitignore index 96ef6c0..7d4d835 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target Cargo.lock +.direnv diff --git a/src/lib.rs b/src/lib.rs index 228ffe8..e3b2eee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ #![doc = include_str!("../README.md")] #![warn(missing_docs)] use core::{cmp, fmt, mem::MaybeUninit, ops, ptr}; + #[macro_use] mod macros; @@ -43,6 +44,13 @@ use crate::{ util::{Backoff, CachePadded}, }; +const HAS_READER: usize = 1 << (usize::BITS - 1); + +/// Maximum capacity of a `ThingBuf`. This is the largest number of elements that +/// can be stored in a `ThingBuf`. This is the highest power of two that can be expressed by a +/// `usize`, excluding the most significant bit reserved for the "has reader" flag. +pub const MAX_CAPACITY: usize = usize::MAX & !HAS_READER; + /// A reference to an entry in a [`ThingBuf`]. /// /// A `Ref` represents the exclusive permission to mutate a given element in a @@ -62,6 +70,7 @@ pub struct Ref<'slot, T> { ptr: MutPtr>, slot: &'slot Slot, new_state: usize, + is_pop: bool, } /// Error indicating that a `push` operation failed because a queue was at @@ -100,12 +109,19 @@ struct Core { struct Slot { value: UnsafeCell>, + /// Each slot's state has two components: a flag indicated by the most significant bit (MSB), and the rest of the state. + /// The MSB is set when a reader is reading from this slot. + /// The rest of the state helps determine the availability of the slot for reading or writing: + /// - A slot is available for reading when the state (excluding the MSB) equals head + 1. + /// - A slot is available for writing when the state (excluding the MSB) equals tail. + /// At initialization, each slot's state is set to its ordinal index. state: AtomicUsize, } impl Core { #[cfg(not(all(loom, test)))] const fn new(capacity: usize) -> Self { + assert!(capacity <= MAX_CAPACITY); let closed = (capacity + 1).next_power_of_two(); let idx_mask = closed - 1; let gen = closed << 1; @@ -156,7 +172,7 @@ impl Core { } else { // We've reached the end of the current lap, wrap the index around // to 0. - gen.wrapping_add(self.gen) + wrapping_add(gen, self.gen) } } @@ -184,8 +200,7 @@ impl Core { { test_println!("push_ref"); let mut backoff = Backoff::new(); - let mut tail = self.tail.load(Relaxed); - + let mut tail = test_dbg!(self.tail.load(Relaxed)); loop { if test_dbg!(tail & self.closed != 0) { return Err(TrySendError::Closed(())); @@ -210,21 +225,43 @@ impl Core { ); slots.get_unchecked(idx) }; - let state = test_dbg!(slot.state.load(Acquire)); - + let raw_state = test_dbg!(slot.state.load(SeqCst)); + let state = test_dbg!(clear_has_reader(raw_state)); + // slot is writable if test_dbg!(state == tail) { - // Move the tail index forward by 1. let next_tail = self.next(idx, gen); + // try to advance the tail match test_dbg!(self .tail .compare_exchange_weak(tail, next_tail, SeqCst, Acquire)) { + Ok(_) if test_dbg!(check_has_reader(raw_state)) => { + test_println!( + "advanced tail {} to {}; has an active reader, skipping slot [{}]", + tail, + next_tail, + idx + ); + let next_state = wrapping_add(tail, self.gen); + test_dbg!(slot + .state + .fetch_update(SeqCst, SeqCst, |state| { + Some(state & HAS_READER | next_state) + }) + .unwrap_or_else(|_| unreachable!())); + backoff.spin(); + continue; + } Ok(_) => { // We got the slot! It's now okay to write to it - test_println!("claimed tail slot [{}]", idx); + test_println!( + "advanced tail {} to {}; claimed slot [{}]", + tail, + next_tail, + idx + ); // Claim exclusive ownership over the slot let ptr = slot.value.get_mut(); - // Initialize or recycle the element. unsafe { // Safety: we have just claimed exclusive ownership over @@ -240,16 +277,17 @@ impl Core { test_println!("-> recycled"); } } - return Ok(Ref { ptr, new_state: tail + 1, slot, + is_pop: false, }); } Err(actual) => { // Someone else took this slot and advanced the tail // index. Try to claim the new tail. + test_println!("failed to advance tail {} to {}", tail, next_tail); tail = actual; backoff.spin(); continue; @@ -257,25 +295,21 @@ impl Core { } } - if test_dbg!(state.wrapping_add(self.gen) == tail + 1) { - // fake RMW op to placate loom. this should be equivalent to - // doing a relaxed load after a SeqCst fence (per Godbolt - // https://godbolt.org/z/zb15qfEa9), however, loom understands - // this correctly, while it does not understand an explicit - // SeqCst fence and a load. - // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a - // load it gets reordered differently in the model checker lmao... - let head = test_dbg!(self.head.fetch_or(0, SeqCst)); - if test_dbg!(head.wrapping_add(self.gen) == tail) { - test_println!("channel full"); - return Err(TrySendError::Full(())); - } - - backoff.spin(); - } else { - backoff.spin_yield(); + // check if we have any available slots + // fake RMW op to placate loom. this should be equivalent to + // doing a relaxed load after a SeqCst fence (per Godbolt + // https://godbolt.org/z/zb15qfEa9), however, loom understands + // this correctly, while it does not understand an explicit + // SeqCst fence and a load. + // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a + // load it gets reordered differently in the model checker lmao... + let head = test_dbg!(self.head.fetch_or(0, SeqCst)); + if test_dbg!(wrapping_add(head, self.gen) == tail) { + test_println!("channel full"); + return Err(TrySendError::Full(())); } + backoff.spin_yield(); tail = test_dbg!(self.tail.load(Acquire)); } } @@ -308,33 +342,39 @@ impl Core { ); slots.get_unchecked(idx) }; - let state = test_dbg!(slot.state.load(Acquire)); - // If the slot's state is ahead of the head index by one, we can pop - // it. - if test_dbg!(state == head + 1) { - let next_head = self.next(idx, gen); + let raw_state = test_dbg!(slot.state.load(Acquire)); + let next_head = self.next(idx, gen); + + // If the slot's state is ahead of the head index by one, we can pop it. + if test_dbg!(raw_state == head + 1) { + // try to advance the head index match test_dbg!(self .head .compare_exchange_weak(head, next_head, SeqCst, Acquire)) { Ok(_) => { - test_println!("claimed head slot [{}]", idx); + test_println!("advanced head {} to {}", head, next_head); + test_println!("claimed slot [{}]", idx); + let mut new_state = wrapping_add(head, self.gen); + new_state = set_has_reader(new_state); + test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst)); return Ok(Ref { - new_state: head.wrapping_add(self.gen), + new_state, ptr: slot.value.get_mut(), slot, + is_pop: true, }); } Err(actual) => { + test_println!("failed to advance head, head={}, actual={}", head, actual); head = actual; backoff.spin(); continue; } } - } - - if test_dbg!(state == head) { + } else { + // Maybe we reached the tail index? If so, the buffer is empty. // fake RMW op to placate loom. this should be equivalent to // doing a relaxed load after a SeqCst fence (per Godbolt // https://godbolt.org/z/zb15qfEa9), however, loom understands @@ -342,9 +382,7 @@ impl Core { // SeqCst fence and a load. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a // load it gets reordered differently in the model checker lmao... - let tail = test_dbg!(self.tail.fetch_or(0, SeqCst)); - if test_dbg!(tail & !self.closed == head) { return if test_dbg!(tail & self.closed != 0) { Err(TryRecvError::Closed) @@ -354,16 +392,32 @@ impl Core { }; } - if test_dbg!(backoff.done_spinning()) { - return Err(TryRecvError::Empty); + // Is anyone writing to the slot from this generation? + if test_dbg!(raw_state == head) { + if test_dbg!(backoff.done_spinning()) { + return Err(TryRecvError::Empty); + } + backoff.spin(); + continue; } - backoff.spin(); - } else { - backoff.spin_yield(); + // The slot is in an invalid state (was skipped). Try to advance the head index. + match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) { + Ok(_) => { + test_println!("skipped head slot [{}], new head={}", idx, next_head); + } + Err(actual) => { + test_println!( + "failed to skip head slot [{}], head={}, actual={}", + idx, + head, + actual + ); + head = actual; + backoff.spin(); + } + } } - - head = test_dbg!(self.head.load(Acquire)); } } @@ -409,6 +463,26 @@ impl Core { } } +#[inline] +fn check_has_reader(state: usize) -> bool { + state & HAS_READER == HAS_READER +} + +#[inline] +fn set_has_reader(state: usize) -> usize { + state | HAS_READER +} + +#[inline] +fn clear_has_reader(state: usize) -> usize { + state & !HAS_READER +} + +#[inline] +fn wrapping_add(a: usize, b: usize) -> usize { + (a + b) & MAX_CAPACITY +} + impl Drop for Core { fn drop(&mut self) { debug_assert!( @@ -475,8 +549,17 @@ impl ops::DerefMut for Ref<'_, T> { impl Drop for Ref<'_, T> { #[inline] fn drop(&mut self) { - test_println!("drop Ref<{}>", core::any::type_name::()); - test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); + if self.is_pop { + test_println!("drop Ref<{}> (pop)", core::any::type_name::()); + test_dbg!(self.slot.state.fetch_and(!HAS_READER, SeqCst)); + } else { + test_println!( + "drop Ref<{}> (push), new_state = {}", + core::any::type_name::(), + self.new_state + ); + test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); + } } } diff --git a/src/loom.rs b/src/loom.rs index 7b67828..4c0d147 100644 --- a/src/loom.rs +++ b/src/loom.rs @@ -204,6 +204,7 @@ mod inner { mod inner { #![allow(dead_code)] pub(crate) mod sync { + #[allow(unused_imports)] pub use core::sync::*; #[cfg(feature = "alloc")] diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index dcf2a8e..4039428 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -15,7 +15,7 @@ use errors::*; feature! { #![feature = "alloc"] - use crate::loom::sync::Arc; + use crate::{MAX_CAPACITY, loom::sync::Arc}; use alloc::boxed::Box; /// Returns a new asynchronous multi-producer, single consumer (MPSC) @@ -32,10 +32,17 @@ feature! { /// Returns a new asynchronous multi-producer, single consumer (MPSC) /// channel with the provided capacity and [recycling policy]. /// + /// # Panics + /// + /// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value + /// represents the highest power of two that can be expressed by a `usize`, excluding the most + /// significant bit. + /// /// [recycling policy]: crate::recycling::Recycle #[must_use] pub fn with_recycle>(capacity: usize, recycle: R) -> (Sender, Receiver) { assert!(capacity > 0); + assert!(capacity <= MAX_CAPACITY); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index cec6faa..8a0efa1 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -4,16 +4,11 @@ //! [`Receiver`] types in this module wait by blocking the current thread, //! rather than asynchronously yielding. use super::*; -use crate::{ - loom::{ - atomic::{self, Ordering}, - sync::Arc, - thread::{self, Thread}, - }, - recycling::{self, Recycle}, - util::Backoff, - wait::queue, -}; +use crate::{loom::{ + atomic::{self, Ordering}, + sync::Arc, + thread::{self, Thread}, +}, MAX_CAPACITY, recycling::{self, Recycle}, util::Backoff, wait::queue}; use core::{fmt, pin::Pin}; use errors::*; use std::time::{Duration, Instant}; @@ -32,6 +27,11 @@ pub fn channel(capacity: usize) -> (Sender, Receiver) /// Returns a new synchronous multi-producer, single consumer channel with /// the provided [recycling policy]. /// +/// # Panics +/// +/// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value represents +/// the highest power of two that can be expressed by a `usize`, excluding the most significant bit. +/// /// [recycling policy]: crate::recycling::Recycle #[must_use] pub fn with_recycle>( @@ -39,6 +39,7 @@ pub fn with_recycle>( recycle: R, ) -> (Sender, Receiver) { assert!(capacity > 0); + assert!(capacity <= MAX_CAPACITY); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index 2648146..b720540 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -75,6 +75,70 @@ fn mpsc_try_recv_ref() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + // This test emulates a situation where we might need to skip a slot. The setup includes two writing + // threads that write elements to the channel and one reading thread that maintains a RecvRef to the + // third element until the end of the test, necessitating the skip: + // Given that the channel capacity is 2, here's the sequence of operations: + // Thread 1 writes: 1, 2 + // Thread 2 writes: 3, 4 + // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. + // As a result, the third slot is skipped during this process. + loom::model(|| { + let (tx, rx) = channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + future::block_on(async move { + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + }) + }) + }; + + let p2 = { + thread::spawn(move || { + future::block_on(async move { + tx.send(3).await.unwrap(); + tx.send(4).await.unwrap(); + }) + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.len() == 2 && !hold.is_empty() { + vals.push(*hold.pop().unwrap()); + vals.push(*val); + } else if vals.len() == 1 && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + } + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -278,6 +342,43 @@ fn spsc_send_recv_in_order_wrap() { }) } +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + future::block_on(async move { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.recv().await, Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + } + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + } + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.recv().await, Some(i)); + } + assert_eq_dbg!(rx.recv().await, None); + }); + }); + future::block_on(async move { + for i in 1..=N_SENDS { + tx.send(i).await.unwrap(); + } + }); + consumer.join().unwrap(); + }); +} + #[test] #[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_send_recv_wrap() { diff --git a/src/mpsc/tests/mpsc_blocking.rs b/src/mpsc/tests/mpsc_blocking.rs index 89b6f0d..533f583 100644 --- a/src/mpsc/tests/mpsc_blocking.rs +++ b/src/mpsc/tests/mpsc_blocking.rs @@ -1,5 +1,7 @@ use super::*; use crate::loom::{self, alloc::Track, thread}; +use crate::mpsc::blocking; +use crate::mpsc::blocking::RecvRef; #[test] #[cfg_attr(ci_skip_slow_models, ignore)] @@ -71,6 +73,68 @@ fn mpsc_try_recv_ref() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + // This test emulates a situation where we might need to skip a slot. The setup includes two writing + // threads that write elements to the channel and one reading thread that maintains a RecvRef to the + // third element until the end of the test, necessitating the skip: + // Given that the channel capacity is 2, here's the sequence of operations: + // Thread 1 writes: 1, 2 + // Thread 2 writes: 3, 4 + // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. + // As a result, the third slot is skipped during this process. + loom::model(|| { + let (tx, rx) = blocking::channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.send_ref().unwrap() = 1; + thread::yield_now(); + *tx.send_ref().unwrap() = 2; + }) + }; + + let p2 = { + thread::spawn(move || { + *tx.send_ref().unwrap() = 3; + thread::yield_now(); + *tx.send_ref().unwrap() = 4; + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.len() == 2 && !hold.is_empty() { + vals.push(*hold.pop().unwrap()); + vals.push(*val); + } else if vals.len() == 1 && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + } + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -326,6 +390,40 @@ fn spsc_send_recv_in_order_wrap() { }) } +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = blocking::channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.recv(), Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + } + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + } + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.recv(), Some(i)); + } + assert_eq_dbg!(rx.recv(), None); + }); + for i in 1..=N_SENDS { + tx.send(i).unwrap(); + } + drop(tx); + consumer.join().unwrap(); + }); +} + #[test] fn tx_close_wakes() { loom::model(|| { diff --git a/src/thingbuf.rs b/src/thingbuf.rs index 82f61be..e606e73 100644 --- a/src/thingbuf.rs +++ b/src/thingbuf.rs @@ -1,7 +1,4 @@ -use crate::{ - recycling::{self, Recycle}, - Core, Full, Ref, Slot, -}; +use crate::{recycling::{self, Recycle}, Core, Full, Ref, Slot, MAX_CAPACITY}; use alloc::boxed::Box; use core::fmt; @@ -300,10 +297,17 @@ where /// Returns a new `ThingBuf` with space for `capacity` elements and /// the provided [recycling policy]. /// + /// # Panics + /// + /// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value + /// represents the highest power of two that can be expressed by a `usize`, excluding the most + /// significant bit. + /// /// [recycling policy]: crate::recycling::Recycle #[must_use] pub fn with_recycle(capacity: usize, recycle: R) -> Self { assert!(capacity > 0); + assert!(capacity <= MAX_CAPACITY); Self { core: Core::new(capacity), slots: Slot::make_boxed_array(capacity), diff --git a/tests/mpsc_blocking.rs b/tests/mpsc_blocking.rs index 4d2fb96..d4a4e70 100644 --- a/tests/mpsc_blocking.rs +++ b/tests/mpsc_blocking.rs @@ -1,5 +1,6 @@ use std::thread; use thingbuf::mpsc::blocking; +use thingbuf::mpsc::errors::{TryRecvError, TrySendError}; #[test] fn basically_works() { @@ -70,3 +71,54 @@ fn tx_close_drains_queue() { producer.join().unwrap(); } } + +#[test] +fn spsc_skip_slot() { + let (tx, rx) = blocking::channel::(3); + // 0 lap + tx.send(0).unwrap(); + assert_eq!(rx.recv(), Some(0)); + tx.send(1).unwrap(); + let msg_ref = rx.try_recv_ref().unwrap(); + tx.send(2).unwrap(); + assert_eq!(rx.recv(), Some(2)); + // 1 lap + tx.send(3).unwrap(); + assert_eq!(rx.recv(), Some(3)); + tx.send(4).unwrap(); + assert_eq!(rx.recv(), Some(4)); + drop(msg_ref); + // 2 lap + tx.send(5).unwrap(); + tx.send(6).unwrap(); + tx.send(7).unwrap(); + assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); + assert_eq!(rx.recv(), Some(5)); + assert_eq!(rx.recv(), Some(6)); + assert_eq!(rx.recv(), Some(7)); +} + +#[test] +fn spsc_full_after_skipped() { + let (tx, rx) = blocking::channel::(3); + // 0 lap + tx.send(0).unwrap(); + assert_eq!(rx.recv(), Some(0)); + tx.send(1).unwrap(); + let _msg_ref = rx.try_recv_ref().unwrap(); + tx.send(2).unwrap(); + // lap 1 + tx.send(3).unwrap(); + assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); +} + +#[test] +fn spsc_empty_after_skipped() { + let (tx, rx) = blocking::channel::(2); + // 0 lap + tx.send(0).unwrap(); + tx.send(1).unwrap(); + let _msg_ref = rx.try_recv_ref().unwrap(); + assert_eq!(rx.recv(), Some(1)); + assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); +}