From a72a2869237c79cd81b58bfcd18b5ded39430cd6 Mon Sep 17 00:00:00 2001 From: "Timothy N. Tsvetkov" <33668+tukan@users.noreply.github.com> Date: Sat, 6 Apr 2024 19:56:45 +0300 Subject: [PATCH 1/3] fix: skip slots with active reading `Ref`s in `push_ref` (#81) `Core::push_ref` can go into an (almost infinite) spin loop waiting for a `Ref` (created in `pop_ref`) to this slot to be dropped. This behaviour can lead to writing being blocked unless all refs are dropped, even if we have free space in the buffer. In this PR I've added a mechanism to skip such slots (by updating their `state`) and attempt to write into them on the next lap. Fixes #83 Closes #80 --- .github/workflows/ci.yml | 1 + src/lib.rs | 177 +++++++++++++++++++++++--------- src/mpsc/async_impl.rs | 9 +- src/mpsc/blocking.rs | 21 ++-- src/mpsc/tests/mpsc_async.rs | 101 ++++++++++++++++++ src/mpsc/tests/mpsc_blocking.rs | 98 ++++++++++++++++++ src/thingbuf.rs | 12 ++- tests/mpsc_blocking.rs | 52 ++++++++++ 8 files changed, 409 insertions(+), 62 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e983a1a..b7e860d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -170,6 +170,7 @@ jobs: - mpsc_send_recv_wrap - mpsc_try_send_recv - mpsc_try_recv_ref + - mpsc_test_skip_slot - mpsc_async::rx_close_unconsumed - mpsc_blocking::rx_close_unconsumed name: model '${{ matrix.model }}'' diff --git a/src/lib.rs b/src/lib.rs index 228ffe8..e3b2eee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ #![doc = include_str!("../README.md")] #![warn(missing_docs)] use core::{cmp, fmt, mem::MaybeUninit, ops, ptr}; + #[macro_use] mod macros; @@ -43,6 +44,13 @@ use crate::{ util::{Backoff, CachePadded}, }; +const HAS_READER: usize = 1 << (usize::BITS - 1); + +/// Maximum capacity of a `ThingBuf`. This is the largest number of elements that +/// can be stored in a `ThingBuf`. This is the highest power of two that can be expressed by a +/// `usize`, excluding the most significant bit reserved for the "has reader" flag. +pub const MAX_CAPACITY: usize = usize::MAX & !HAS_READER; + /// A reference to an entry in a [`ThingBuf`]. /// /// A `Ref` represents the exclusive permission to mutate a given element in a @@ -62,6 +70,7 @@ pub struct Ref<'slot, T> { ptr: MutPtr>, slot: &'slot Slot, new_state: usize, + is_pop: bool, } /// Error indicating that a `push` operation failed because a queue was at @@ -100,12 +109,19 @@ struct Core { struct Slot { value: UnsafeCell>, + /// Each slot's state has two components: a flag indicated by the most significant bit (MSB), and the rest of the state. + /// The MSB is set when a reader is reading from this slot. + /// The rest of the state helps determine the availability of the slot for reading or writing: + /// - A slot is available for reading when the state (excluding the MSB) equals head + 1. + /// - A slot is available for writing when the state (excluding the MSB) equals tail. + /// At initialization, each slot's state is set to its ordinal index. state: AtomicUsize, } impl Core { #[cfg(not(all(loom, test)))] const fn new(capacity: usize) -> Self { + assert!(capacity <= MAX_CAPACITY); let closed = (capacity + 1).next_power_of_two(); let idx_mask = closed - 1; let gen = closed << 1; @@ -156,7 +172,7 @@ impl Core { } else { // We've reached the end of the current lap, wrap the index around // to 0. - gen.wrapping_add(self.gen) + wrapping_add(gen, self.gen) } } @@ -184,8 +200,7 @@ impl Core { { test_println!("push_ref"); let mut backoff = Backoff::new(); - let mut tail = self.tail.load(Relaxed); - + let mut tail = test_dbg!(self.tail.load(Relaxed)); loop { if test_dbg!(tail & self.closed != 0) { return Err(TrySendError::Closed(())); @@ -210,21 +225,43 @@ impl Core { ); slots.get_unchecked(idx) }; - let state = test_dbg!(slot.state.load(Acquire)); - + let raw_state = test_dbg!(slot.state.load(SeqCst)); + let state = test_dbg!(clear_has_reader(raw_state)); + // slot is writable if test_dbg!(state == tail) { - // Move the tail index forward by 1. let next_tail = self.next(idx, gen); + // try to advance the tail match test_dbg!(self .tail .compare_exchange_weak(tail, next_tail, SeqCst, Acquire)) { + Ok(_) if test_dbg!(check_has_reader(raw_state)) => { + test_println!( + "advanced tail {} to {}; has an active reader, skipping slot [{}]", + tail, + next_tail, + idx + ); + let next_state = wrapping_add(tail, self.gen); + test_dbg!(slot + .state + .fetch_update(SeqCst, SeqCst, |state| { + Some(state & HAS_READER | next_state) + }) + .unwrap_or_else(|_| unreachable!())); + backoff.spin(); + continue; + } Ok(_) => { // We got the slot! It's now okay to write to it - test_println!("claimed tail slot [{}]", idx); + test_println!( + "advanced tail {} to {}; claimed slot [{}]", + tail, + next_tail, + idx + ); // Claim exclusive ownership over the slot let ptr = slot.value.get_mut(); - // Initialize or recycle the element. unsafe { // Safety: we have just claimed exclusive ownership over @@ -240,16 +277,17 @@ impl Core { test_println!("-> recycled"); } } - return Ok(Ref { ptr, new_state: tail + 1, slot, + is_pop: false, }); } Err(actual) => { // Someone else took this slot and advanced the tail // index. Try to claim the new tail. + test_println!("failed to advance tail {} to {}", tail, next_tail); tail = actual; backoff.spin(); continue; @@ -257,25 +295,21 @@ impl Core { } } - if test_dbg!(state.wrapping_add(self.gen) == tail + 1) { - // fake RMW op to placate loom. this should be equivalent to - // doing a relaxed load after a SeqCst fence (per Godbolt - // https://godbolt.org/z/zb15qfEa9), however, loom understands - // this correctly, while it does not understand an explicit - // SeqCst fence and a load. - // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a - // load it gets reordered differently in the model checker lmao... - let head = test_dbg!(self.head.fetch_or(0, SeqCst)); - if test_dbg!(head.wrapping_add(self.gen) == tail) { - test_println!("channel full"); - return Err(TrySendError::Full(())); - } - - backoff.spin(); - } else { - backoff.spin_yield(); + // check if we have any available slots + // fake RMW op to placate loom. this should be equivalent to + // doing a relaxed load after a SeqCst fence (per Godbolt + // https://godbolt.org/z/zb15qfEa9), however, loom understands + // this correctly, while it does not understand an explicit + // SeqCst fence and a load. + // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a + // load it gets reordered differently in the model checker lmao... + let head = test_dbg!(self.head.fetch_or(0, SeqCst)); + if test_dbg!(wrapping_add(head, self.gen) == tail) { + test_println!("channel full"); + return Err(TrySendError::Full(())); } + backoff.spin_yield(); tail = test_dbg!(self.tail.load(Acquire)); } } @@ -308,33 +342,39 @@ impl Core { ); slots.get_unchecked(idx) }; - let state = test_dbg!(slot.state.load(Acquire)); - // If the slot's state is ahead of the head index by one, we can pop - // it. - if test_dbg!(state == head + 1) { - let next_head = self.next(idx, gen); + let raw_state = test_dbg!(slot.state.load(Acquire)); + let next_head = self.next(idx, gen); + + // If the slot's state is ahead of the head index by one, we can pop it. + if test_dbg!(raw_state == head + 1) { + // try to advance the head index match test_dbg!(self .head .compare_exchange_weak(head, next_head, SeqCst, Acquire)) { Ok(_) => { - test_println!("claimed head slot [{}]", idx); + test_println!("advanced head {} to {}", head, next_head); + test_println!("claimed slot [{}]", idx); + let mut new_state = wrapping_add(head, self.gen); + new_state = set_has_reader(new_state); + test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst)); return Ok(Ref { - new_state: head.wrapping_add(self.gen), + new_state, ptr: slot.value.get_mut(), slot, + is_pop: true, }); } Err(actual) => { + test_println!("failed to advance head, head={}, actual={}", head, actual); head = actual; backoff.spin(); continue; } } - } - - if test_dbg!(state == head) { + } else { + // Maybe we reached the tail index? If so, the buffer is empty. // fake RMW op to placate loom. this should be equivalent to // doing a relaxed load after a SeqCst fence (per Godbolt // https://godbolt.org/z/zb15qfEa9), however, loom understands @@ -342,9 +382,7 @@ impl Core { // SeqCst fence and a load. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a // load it gets reordered differently in the model checker lmao... - let tail = test_dbg!(self.tail.fetch_or(0, SeqCst)); - if test_dbg!(tail & !self.closed == head) { return if test_dbg!(tail & self.closed != 0) { Err(TryRecvError::Closed) @@ -354,16 +392,32 @@ impl Core { }; } - if test_dbg!(backoff.done_spinning()) { - return Err(TryRecvError::Empty); + // Is anyone writing to the slot from this generation? + if test_dbg!(raw_state == head) { + if test_dbg!(backoff.done_spinning()) { + return Err(TryRecvError::Empty); + } + backoff.spin(); + continue; } - backoff.spin(); - } else { - backoff.spin_yield(); + // The slot is in an invalid state (was skipped). Try to advance the head index. + match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) { + Ok(_) => { + test_println!("skipped head slot [{}], new head={}", idx, next_head); + } + Err(actual) => { + test_println!( + "failed to skip head slot [{}], head={}, actual={}", + idx, + head, + actual + ); + head = actual; + backoff.spin(); + } + } } - - head = test_dbg!(self.head.load(Acquire)); } } @@ -409,6 +463,26 @@ impl Core { } } +#[inline] +fn check_has_reader(state: usize) -> bool { + state & HAS_READER == HAS_READER +} + +#[inline] +fn set_has_reader(state: usize) -> usize { + state | HAS_READER +} + +#[inline] +fn clear_has_reader(state: usize) -> usize { + state & !HAS_READER +} + +#[inline] +fn wrapping_add(a: usize, b: usize) -> usize { + (a + b) & MAX_CAPACITY +} + impl Drop for Core { fn drop(&mut self) { debug_assert!( @@ -475,8 +549,17 @@ impl ops::DerefMut for Ref<'_, T> { impl Drop for Ref<'_, T> { #[inline] fn drop(&mut self) { - test_println!("drop Ref<{}>", core::any::type_name::()); - test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); + if self.is_pop { + test_println!("drop Ref<{}> (pop)", core::any::type_name::()); + test_dbg!(self.slot.state.fetch_and(!HAS_READER, SeqCst)); + } else { + test_println!( + "drop Ref<{}> (push), new_state = {}", + core::any::type_name::(), + self.new_state + ); + test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release)); + } } } diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 9f3431b..dbe3192 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -15,7 +15,7 @@ use errors::*; feature! { #![feature = "alloc"] - use crate::loom::sync::Arc; + use crate::{MAX_CAPACITY, loom::sync::Arc}; use alloc::boxed::Box; /// Returns a new asynchronous multi-producer, single consumer (MPSC) @@ -32,10 +32,17 @@ feature! { /// Returns a new asynchronous multi-producer, single consumer (MPSC) /// channel with the provided capacity and [recycling policy]. /// + /// # Panics + /// + /// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value + /// represents the highest power of two that can be expressed by a `usize`, excluding the most + /// significant bit. + /// /// [recycling policy]: crate::recycling::Recycle #[must_use] pub fn with_recycle>(capacity: usize, recycle: R) -> (Sender, Receiver) { assert!(capacity > 0); + assert!(capacity <= MAX_CAPACITY); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index cb6f037..9a85884 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -4,16 +4,11 @@ //! [`Receiver`] types in this module wait by blocking the current thread, //! rather than asynchronously yielding. use super::*; -use crate::{ - loom::{ - atomic::{self, Ordering}, - sync::Arc, - thread::{self, Thread}, - }, - recycling::{self, Recycle}, - util::Backoff, - wait::queue, -}; +use crate::{loom::{ + atomic::{self, Ordering}, + sync::Arc, + thread::{self, Thread}, +}, MAX_CAPACITY, recycling::{self, Recycle}, util::Backoff, wait::queue}; use core::{fmt, pin::Pin}; use errors::*; use std::time::{Duration, Instant}; @@ -32,6 +27,11 @@ pub fn channel(capacity: usize) -> (Sender, Receiver) /// Returns a new synchronous multi-producer, single consumer channel with /// the provided [recycling policy]. /// +/// # Panics +/// +/// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value represents +/// the highest power of two that can be expressed by a `usize`, excluding the most significant bit. +/// /// [recycling policy]: crate::recycling::Recycle #[must_use] pub fn with_recycle>( @@ -39,6 +39,7 @@ pub fn with_recycle>( recycle: R, ) -> (Sender, Receiver) { assert!(capacity > 0); + assert!(capacity <= MAX_CAPACITY); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index 2648146..b720540 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -75,6 +75,70 @@ fn mpsc_try_recv_ref() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + // This test emulates a situation where we might need to skip a slot. The setup includes two writing + // threads that write elements to the channel and one reading thread that maintains a RecvRef to the + // third element until the end of the test, necessitating the skip: + // Given that the channel capacity is 2, here's the sequence of operations: + // Thread 1 writes: 1, 2 + // Thread 2 writes: 3, 4 + // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. + // As a result, the third slot is skipped during this process. + loom::model(|| { + let (tx, rx) = channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + future::block_on(async move { + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + }) + }) + }; + + let p2 = { + thread::spawn(move || { + future::block_on(async move { + tx.send(3).await.unwrap(); + tx.send(4).await.unwrap(); + }) + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.len() == 2 && !hold.is_empty() { + vals.push(*hold.pop().unwrap()); + vals.push(*val); + } else if vals.len() == 1 && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + } + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -278,6 +342,43 @@ fn spsc_send_recv_in_order_wrap() { }) } +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + future::block_on(async move { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.recv().await, Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + } + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + } + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.recv().await, Some(i)); + } + assert_eq_dbg!(rx.recv().await, None); + }); + }); + future::block_on(async move { + for i in 1..=N_SENDS { + tx.send(i).await.unwrap(); + } + }); + consumer.join().unwrap(); + }); +} + #[test] #[cfg_attr(ci_skip_slow_models, ignore)] fn mpsc_send_recv_wrap() { diff --git a/src/mpsc/tests/mpsc_blocking.rs b/src/mpsc/tests/mpsc_blocking.rs index 89b6f0d..533f583 100644 --- a/src/mpsc/tests/mpsc_blocking.rs +++ b/src/mpsc/tests/mpsc_blocking.rs @@ -1,5 +1,7 @@ use super::*; use crate::loom::{self, alloc::Track, thread}; +use crate::mpsc::blocking; +use crate::mpsc::blocking::RecvRef; #[test] #[cfg_attr(ci_skip_slow_models, ignore)] @@ -71,6 +73,68 @@ fn mpsc_try_recv_ref() { }) } +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + // This test emulates a situation where we might need to skip a slot. The setup includes two writing + // threads that write elements to the channel and one reading thread that maintains a RecvRef to the + // third element until the end of the test, necessitating the skip: + // Given that the channel capacity is 2, here's the sequence of operations: + // Thread 1 writes: 1, 2 + // Thread 2 writes: 3, 4 + // The main thread reads from slots in this order: 0, 1, 0 (holds ref), 1, 1. + // As a result, the third slot is skipped during this process. + loom::model(|| { + let (tx, rx) = blocking::channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.send_ref().unwrap() = 1; + thread::yield_now(); + *tx.send_ref().unwrap() = 2; + }) + }; + + let p2 = { + thread::spawn(move || { + *tx.send_ref().unwrap() = 3; + thread::yield_now(); + *tx.send_ref().unwrap() = 4; + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.len() == 2 && !hold.is_empty() { + vals.push(*hold.pop().unwrap()); + vals.push(*val); + } else if vals.len() == 1 && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + } + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + #[test] fn rx_closes() { const ITERATIONS: usize = 6; @@ -326,6 +390,40 @@ fn spsc_send_recv_in_order_wrap() { }) } +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = blocking::channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.recv(), Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + } + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + } + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.recv(), Some(i)); + } + assert_eq_dbg!(rx.recv(), None); + }); + for i in 1..=N_SENDS { + tx.send(i).unwrap(); + } + drop(tx); + consumer.join().unwrap(); + }); +} + #[test] fn tx_close_wakes() { loom::model(|| { diff --git a/src/thingbuf.rs b/src/thingbuf.rs index 82f61be..e606e73 100644 --- a/src/thingbuf.rs +++ b/src/thingbuf.rs @@ -1,7 +1,4 @@ -use crate::{ - recycling::{self, Recycle}, - Core, Full, Ref, Slot, -}; +use crate::{recycling::{self, Recycle}, Core, Full, Ref, Slot, MAX_CAPACITY}; use alloc::boxed::Box; use core::fmt; @@ -300,10 +297,17 @@ where /// Returns a new `ThingBuf` with space for `capacity` elements and /// the provided [recycling policy]. /// + /// # Panics + /// + /// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value + /// represents the highest power of two that can be expressed by a `usize`, excluding the most + /// significant bit. + /// /// [recycling policy]: crate::recycling::Recycle #[must_use] pub fn with_recycle(capacity: usize, recycle: R) -> Self { assert!(capacity > 0); + assert!(capacity <= MAX_CAPACITY); Self { core: Core::new(capacity), slots: Slot::make_boxed_array(capacity), diff --git a/tests/mpsc_blocking.rs b/tests/mpsc_blocking.rs index 4d2fb96..d4a4e70 100644 --- a/tests/mpsc_blocking.rs +++ b/tests/mpsc_blocking.rs @@ -1,5 +1,6 @@ use std::thread; use thingbuf::mpsc::blocking; +use thingbuf::mpsc::errors::{TryRecvError, TrySendError}; #[test] fn basically_works() { @@ -70,3 +71,54 @@ fn tx_close_drains_queue() { producer.join().unwrap(); } } + +#[test] +fn spsc_skip_slot() { + let (tx, rx) = blocking::channel::(3); + // 0 lap + tx.send(0).unwrap(); + assert_eq!(rx.recv(), Some(0)); + tx.send(1).unwrap(); + let msg_ref = rx.try_recv_ref().unwrap(); + tx.send(2).unwrap(); + assert_eq!(rx.recv(), Some(2)); + // 1 lap + tx.send(3).unwrap(); + assert_eq!(rx.recv(), Some(3)); + tx.send(4).unwrap(); + assert_eq!(rx.recv(), Some(4)); + drop(msg_ref); + // 2 lap + tx.send(5).unwrap(); + tx.send(6).unwrap(); + tx.send(7).unwrap(); + assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); + assert_eq!(rx.recv(), Some(5)); + assert_eq!(rx.recv(), Some(6)); + assert_eq!(rx.recv(), Some(7)); +} + +#[test] +fn spsc_full_after_skipped() { + let (tx, rx) = blocking::channel::(3); + // 0 lap + tx.send(0).unwrap(); + assert_eq!(rx.recv(), Some(0)); + tx.send(1).unwrap(); + let _msg_ref = rx.try_recv_ref().unwrap(); + tx.send(2).unwrap(); + // lap 1 + tx.send(3).unwrap(); + assert!(matches!(tx.try_send_ref(), Err(TrySendError::Full(_)))); +} + +#[test] +fn spsc_empty_after_skipped() { + let (tx, rx) = blocking::channel::(2); + // 0 lap + tx.send(0).unwrap(); + tx.send(1).unwrap(); + let _msg_ref = rx.try_recv_ref().unwrap(); + assert_eq!(rx.recv(), Some(1)); + assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); +} From ac1eafccbdff39e7be22823184daf44f8ee99d12 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 6 Apr 2024 10:00:44 -0700 Subject: [PATCH 2/3] fix: unused import with `alloc` enabled This fixes an unused import in the `loom` module when the `alloc` feature flag is enabled. This should placate CI. --- src/loom.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/loom.rs b/src/loom.rs index 7b67828..4c0d147 100644 --- a/src/loom.rs +++ b/src/loom.rs @@ -204,6 +204,7 @@ mod inner { mod inner { #![allow(dead_code)] pub(crate) mod sync { + #[allow(unused_imports)] pub use core::sync::*; #[cfg(feature = "alloc")] From c839b602402d30c2858c6de83a3999b8ff7e8a45 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 6 Apr 2024 10:02:15 -0700 Subject: [PATCH 3/3] chore: add `.direnv` to `.gitignore` `nix-direnv` puts stuff in `.direnv`. We don't want this in Git. --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 96ef6c0..7d4d835 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target Cargo.lock +.direnv