From a8a35249c5347c0e6cac7e6c43a5e5b56b250625 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 15 Dec 2024 23:22:00 +0900 Subject: [PATCH] Port #1146 & #1147 to deque::Injector and queue::SegQueue --- crossbeam-deque/src/deque.rs | 35 ++++++++++++++++++++---------- crossbeam-deque/tests/injector.rs | 16 ++++++++++++++ crossbeam-queue/src/seg_queue.rs | 35 ++++++++++++++++++++---------- crossbeam-queue/tests/seg_queue.rs | 15 +++++++++++++ 4 files changed, 77 insertions(+), 24 deletions(-) diff --git a/crossbeam-deque/src/deque.rs b/crossbeam-deque/src/deque.rs index 7b18e151a..e207db641 100644 --- a/crossbeam-deque/src/deque.rs +++ b/crossbeam-deque/src/deque.rs @@ -1,3 +1,4 @@ +use std::alloc::{alloc_zeroed, handle_alloc_error, Layout}; use std::boxed::Box; use std::cell::{Cell, UnsafeCell}; use std::cmp; @@ -1205,11 +1206,6 @@ struct Slot { } impl Slot { - const UNINIT: Self = Self { - task: UnsafeCell::new(MaybeUninit::uninit()), - state: AtomicUsize::new(0), - }; - /// Waits until a task is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); @@ -1231,12 +1227,27 @@ struct Block { } impl Block { - /// Creates an empty block that starts at `start_index`. - fn new() -> Self { - Self { - next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], + /// Creates an empty block. + fn new() -> Box { + let layout = Layout::new::(); + assert!( + layout.size() != 0, + "Block should never be zero-sized, as it has an AtomicPtr field" + ); + // SAFETY: layout is not zero-sized + let ptr = unsafe { alloc_zeroed(layout) }; + // Handle allocation failure + if ptr.is_null() { + handle_alloc_error(layout) } + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + // TODO: unsafe { Box::new_zeroed().assume_init() } + unsafe { Box::from_raw(ptr.cast()) } } /// Waits until the next pointer is set. @@ -1315,7 +1326,7 @@ unsafe impl Sync for Injector {} impl Default for Injector { fn default() -> Self { - let block = Box::into_raw(Box::new(Block::::new())); + let block = Box::into_raw(Block::::new()); Self { head: CachePadded::new(Position { block: AtomicPtr::new(block), @@ -1376,7 +1387,7 @@ impl Injector { // If we're going to have to install the next block, allocate it in advance in order to // make the wait for other threads as short as possible. if offset + 1 == BLOCK_CAP && next_block.is_none() { - next_block = Some(Box::new(Block::::new())); + next_block = Some(Block::::new()); } let new_tail = tail + (1 << SHIFT); diff --git a/crossbeam-deque/tests/injector.rs b/crossbeam-deque/tests/injector.rs index f706a8d9c..5f6c3e98e 100644 --- a/crossbeam-deque/tests/injector.rs +++ b/crossbeam-deque/tests/injector.rs @@ -373,3 +373,19 @@ fn destructors() { } } } + +// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and +// probably overflow the thread stack. It's now directly created on the heap to avoid this. +#[test] +fn stack_overflow() { + const N: usize = 32_768; + struct BigStruct { + _data: [u8; N], + } + + let q = Injector::new(); + + q.push(BigStruct { _data: [0u8; N] }); + + while !matches!(q.steal(), Empty) {} +} diff --git a/crossbeam-queue/src/seg_queue.rs b/crossbeam-queue/src/seg_queue.rs index 397c43d96..5e23a8133 100644 --- a/crossbeam-queue/src/seg_queue.rs +++ b/crossbeam-queue/src/seg_queue.rs @@ -1,3 +1,4 @@ +use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout}; use alloc::boxed::Box; use core::cell::UnsafeCell; use core::fmt; @@ -36,11 +37,6 @@ struct Slot { } impl Slot { - const UNINIT: Self = Self { - value: UnsafeCell::new(MaybeUninit::uninit()), - state: AtomicUsize::new(0), - }; - /// Waits until a value is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); @@ -62,12 +58,27 @@ struct Block { } impl Block { - /// Creates an empty block that starts at `start_index`. - fn new() -> Self { - Self { - next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], + /// Creates an empty block. + fn new() -> Box { + let layout = Layout::new::(); + assert!( + layout.size() != 0, + "Block should never be zero-sized, as it has an AtomicPtr field" + ); + // SAFETY: layout is not zero-sized + let ptr = unsafe { alloc_zeroed(layout) }; + // Handle allocation failure + if ptr.is_null() { + handle_alloc_error(layout) } + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + // TODO: unsafe { Box::new_zeroed().assume_init() } + unsafe { Box::from_raw(ptr.cast()) } } /// Waits until the next pointer is set. @@ -209,12 +220,12 @@ impl SegQueue { // If we're going to have to install the next block, allocate it in advance in order to // make the wait for other threads as short as possible. if offset + 1 == BLOCK_CAP && next_block.is_none() { - next_block = Some(Box::new(Block::::new())); + next_block = Some(Block::::new()); } // If this is the first push operation, we need to allocate the first block. if block.is_null() { - let new = Box::into_raw(Box::new(Block::::new())); + let new = Box::into_raw(Block::::new()); if self .tail diff --git a/crossbeam-queue/tests/seg_queue.rs b/crossbeam-queue/tests/seg_queue.rs index bf5fb998a..d2ad1e472 100644 --- a/crossbeam-queue/tests/seg_queue.rs +++ b/crossbeam-queue/tests/seg_queue.rs @@ -193,3 +193,18 @@ fn into_iter_drop() { assert_eq!(i, j); } } + +// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and +// probably overflow the thread stack. It's now directly created on the heap to avoid this. +#[test] +fn stack_overflow() { + const N: usize = 32_768; + struct BigStruct { + _data: [u8; N], + } + + let q = SegQueue::new(); + q.push(BigStruct { _data: [0u8; N] }); + + for _data in q.into_iter() {} +}