Skip to content

Commit

Permalink
Port crossbeam-rs#1146 & crossbeam-rs#1147 to deque::Injector and que…
Browse files Browse the repository at this point in the history
…ue::SegQueue
  • Loading branch information
taiki-e authored and al8n committed Dec 24, 2024
1 parent ab845d5 commit 52256c7
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 24 deletions.
35 changes: 23 additions & 12 deletions crossbeam-deque/src/deque.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
use std::boxed::Box;
use std::cell::{Cell, UnsafeCell};
use std::cmp;
Expand Down Expand Up @@ -1205,11 +1206,6 @@ struct Slot<T> {
}

impl<T> Slot<T> {
const UNINIT: Self = Self {
task: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};

/// Waits until a task is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
Expand All @@ -1231,12 +1227,27 @@ struct Block<T> {
}

impl<T> Block<T> {
/// Creates an empty block that starts at `start_index`.
fn new() -> Self {
Self {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
/// Creates an empty block.
fn new() -> Box<Self> {
let layout = Layout::new::<Self>();
assert!(
layout.size() != 0,
"Block should never be zero-sized, as it has an AtomicPtr field"
);
// SAFETY: layout is not zero-sized
let ptr = unsafe { alloc_zeroed(layout) };
// Handle allocation failure
if ptr.is_null() {
handle_alloc_error(layout)
}
// SAFETY: This is safe because:
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
// [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
// holds a MaybeUninit.
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
// TODO: unsafe { Box::new_zeroed().assume_init() }
unsafe { Box::from_raw(ptr.cast()) }
}

/// Waits until the next pointer is set.
Expand Down Expand Up @@ -1315,7 +1326,7 @@ unsafe impl<T: Send> Sync for Injector<T> {}

impl<T> Default for Injector<T> {
fn default() -> Self {
let block = Box::into_raw(Box::new(Block::<T>::new()));
let block = Box::into_raw(Block::<T>::new());
Self {
head: CachePadded::new(Position {
block: AtomicPtr::new(block),
Expand Down Expand Up @@ -1376,7 +1387,7 @@ impl<T> Injector<T> {
// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
next_block = Some(Block::<T>::new());
}

let new_tail = tail + (1 << SHIFT);
Expand Down
16 changes: 16 additions & 0 deletions crossbeam-deque/tests/injector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,19 @@ fn destructors() {
}
}
}

// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and
// probably overflow the thread stack. It's now directly created on the heap to avoid this.
#[test]
fn stack_overflow() {
const N: usize = 32_768;
struct BigStruct {
_data: [u8; N],
}

let q = Injector::new();

q.push(BigStruct { _data: [0u8; N] });

while !matches!(q.steal(), Empty) {}
}
35 changes: 23 additions & 12 deletions crossbeam-queue/src/seg_queue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout};
use alloc::boxed::Box;
use core::cell::UnsafeCell;
use core::fmt;
Expand Down Expand Up @@ -36,11 +37,6 @@ struct Slot<T> {
}

impl<T> Slot<T> {
const UNINIT: Self = Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};

/// Waits until a value is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
Expand All @@ -62,12 +58,27 @@ struct Block<T> {
}

impl<T> Block<T> {
/// Creates an empty block that starts at `start_index`.
fn new() -> Self {
Self {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
/// Creates an empty block.
fn new() -> Box<Self> {
let layout = Layout::new::<Self>();
assert!(
layout.size() != 0,
"Block should never be zero-sized, as it has an AtomicPtr field"
);
// SAFETY: layout is not zero-sized
let ptr = unsafe { alloc_zeroed(layout) };
// Handle allocation failure
if ptr.is_null() {
handle_alloc_error(layout)
}
// SAFETY: This is safe because:
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
// [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
// holds a MaybeUninit.
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
// TODO: unsafe { Box::new_zeroed().assume_init() }
unsafe { Box::from_raw(ptr.cast()) }
}

/// Waits until the next pointer is set.
Expand Down Expand Up @@ -209,12 +220,12 @@ impl<T> SegQueue<T> {
// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
next_block = Some(Block::<T>::new());
}

// If this is the first push operation, we need to allocate the first block.
if block.is_null() {
let new = Box::into_raw(Box::new(Block::<T>::new()));
let new = Box::into_raw(Block::<T>::new());

if self
.tail
Expand Down
15 changes: 15 additions & 0 deletions crossbeam-queue/tests/seg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,18 @@ fn into_iter_drop() {
assert_eq!(i, j);
}
}

// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and
// probably overflow the thread stack. It's now directly created on the heap to avoid this.
#[test]
fn stack_overflow() {
const N: usize = 32_768;
struct BigStruct {
_data: [u8; N],
}

let q = SegQueue::new();
q.push(BigStruct { _data: [0u8; N] });

for _data in q.into_iter() {}
}

0 comments on commit 52256c7

Please sign in to comment.