diff --git a/crossbeam-channel/Cargo.toml b/crossbeam-channel/Cargo.toml index 2b212a93e..d9e436687 100644 --- a/crossbeam-channel/Cargo.toml +++ b/crossbeam-channel/Cargo.toml @@ -15,6 +15,9 @@ description = "Multi-producer multi-consumer channels for message passing" keywords = ["channel", "mpmc", "select", "golang", "message"] categories = ["algorithms", "concurrency", "data-structures"] +[dependencies] +maybe-uninit = "2.0.0" + [dependencies.crossbeam-utils] version = "0.7" path = "../crossbeam-utils" diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 1537c0939..659fce69a 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -22,6 +22,8 @@ use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; +use maybe_uninit::MaybeUninit; + use context::Context; use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use select::{Operation, SelectHandle, Selected, Token}; @@ -33,7 +35,7 @@ struct Slot { stamp: AtomicUsize, /// The message in this slot. - msg: UnsafeCell, + msg: UnsafeCell>, } /// The token type for the array flavor. @@ -233,7 +235,7 @@ impl Channel { let slot: &Slot = &*(token.array.slot as *const Slot); // Write the message into the slot and update the stamp. - slot.msg.get().write(msg); + slot.msg.get().write(MaybeUninit::new(msg)); slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping receiver. @@ -323,7 +325,7 @@ impl Channel { let slot: &Slot = &*(token.array.slot as *const Slot); // Read the message from the slot and update the stamp. - let msg = slot.msg.get().read(); + let msg = slot.msg.get().read().assume_init(); slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping sender. @@ -542,7 +544,12 @@ impl Drop for Channel { }; unsafe { - self.buffer.add(index).drop_in_place(); + let p = { + let slot = &mut *self.buffer.add(index); + let msg = &mut *slot.msg.get(); + msg.as_mut_ptr() + }; + p.drop_in_place(); } } diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index ae286d3e4..dd9e6e9de 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -2,13 +2,14 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; -use std::mem::{self, ManuallyDrop}; use std::ptr; use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; +use maybe_uninit::MaybeUninit; + use context::Context; use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use select::{Operation, SelectHandle, Selected, Token}; @@ -42,7 +43,7 @@ const MARK_BIT: usize = 1; /// A slot in a block. struct Slot { /// The message. - msg: UnsafeCell>, + msg: UnsafeCell>, /// The state of the slot. state: AtomicUsize, @@ -72,7 +73,13 @@ struct Block { impl Block { /// Creates an empty block. fn new() -> Block { - unsafe { mem::zeroed() } + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + unsafe { MaybeUninit::zeroed().assume_init() } } /// Waits until the next pointer is set. @@ -280,7 +287,7 @@ impl Channel { let block = token.list.block as *mut Block; let offset = token.list.offset; let slot = (*block).slots.get_unchecked(offset); - slot.msg.get().write(ManuallyDrop::new(msg)); + slot.msg.get().write(MaybeUninit::new(msg)); slot.state.fetch_or(WRITE, Ordering::Release); // Wake a sleeping receiver. @@ -385,8 +392,7 @@ impl Channel { let offset = token.list.offset; let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let m = slot.msg.get().read(); - let msg = ManuallyDrop::into_inner(m); + let msg = slot.msg.get().read().assume_init(); // Destroy the block if we've reached the end, or if another thread wanted to destroy but // couldn't because we were busy reading from the slot. @@ -572,7 +578,8 @@ impl Drop for Channel { if offset < BLOCK_CAP { // Drop the message in the slot. let slot = (*block).slots.get_unchecked(offset); - ManuallyDrop::drop(&mut *(*slot).msg.get()); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. let next = (*block).next.load(Ordering::Relaxed); diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index 1d5160056..898239632 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -348,6 +348,7 @@ #![warn(missing_debug_implementations)] extern crate crossbeam_utils; +extern crate maybe_uninit; mod channel; mod context; diff --git a/crossbeam-deque/Cargo.toml b/crossbeam-deque/Cargo.toml index a8dd915ae..35edd56df 100644 --- a/crossbeam-deque/Cargo.toml +++ b/crossbeam-deque/Cargo.toml @@ -15,6 +15,9 @@ description = "Concurrent work-stealing deque" keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"] categories = ["algorithms", "concurrency", "data-structures"] +[dependencies] +maybe-uninit = "2.0.0" + [dependencies.crossbeam-epoch] version = "0.8" path = "../crossbeam-epoch" diff --git a/crossbeam-deque/src/lib.rs b/crossbeam-deque/src/lib.rs index 70ffecc42..59004174d 100644 --- a/crossbeam-deque/src/lib.rs +++ b/crossbeam-deque/src/lib.rs @@ -92,12 +92,14 @@ extern crate crossbeam_epoch as epoch; extern crate crossbeam_utils as utils; +extern crate maybe_uninit; + use std::cell::{Cell, UnsafeCell}; use std::cmp; use std::fmt; use std::iter::FromIterator; use std::marker::PhantomData; -use std::mem::{self, ManuallyDrop}; +use std::mem; use std::ptr; use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; @@ -105,6 +107,8 @@ use std::sync::Arc; use epoch::{Atomic, Owned}; use utils::{Backoff, CachePadded}; +use maybe_uninit::MaybeUninit; + // Minimum buffer capacity. const MIN_CAP: usize = 64; // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. @@ -218,7 +222,7 @@ impl Drop for Inner { // Go through the buffer from front to back and drop all tasks in the queue. let mut i = f; while i != b { - ptr::drop_in_place(buffer.deref().at(i)); + buffer.deref().at(i).drop_in_place(); i = i.wrapping_add(1); } @@ -1140,7 +1144,7 @@ const HAS_NEXT: usize = 1; /// A slot in a block. struct Slot { /// The task. - task: UnsafeCell>, + task: UnsafeCell>, /// The state of the slot. state: AtomicUsize, @@ -1170,7 +1174,13 @@ struct Block { impl Block { /// Creates an empty block that starts at `start_index`. fn new() -> Block { - unsafe { mem::zeroed() } + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + unsafe { MaybeUninit::zeroed().assume_init() } } /// Waits until the next pointer is set. @@ -1329,7 +1339,7 @@ impl Injector { // Write the task into the slot. let slot = (*block).slots.get_unchecked(offset); - slot.task.get().write(ManuallyDrop::new(task)); + slot.task.get().write(MaybeUninit::new(task)); slot.state.fetch_or(WRITE, Ordering::Release); return; @@ -1422,8 +1432,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let m = slot.task.get().read(); - let task = ManuallyDrop::into_inner(m); + let task = slot.task.get().read().assume_init(); // Destroy the block if we've reached the end, or if another thread wanted to destroy // but couldn't because we were busy reading from the slot. @@ -1548,8 +1557,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); - let m = slot.task.get().read(); - let task = ManuallyDrop::into_inner(m); + let task = slot.task.get().read().assume_init(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); @@ -1561,8 +1569,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); - let m = slot.task.get().read(); - let task = ManuallyDrop::into_inner(m); + let task = slot.task.get().read().assume_init(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); @@ -1704,8 +1711,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let m = slot.task.get().read(); - let task = ManuallyDrop::into_inner(m); + let task = slot.task.get().read().assume_init(); match dest.flavor { Flavor::Fifo => { @@ -1714,8 +1720,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); - let m = slot.task.get().read(); - let task = ManuallyDrop::into_inner(m); + let task = slot.task.get().read().assume_init(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); @@ -1728,8 +1733,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); - let m = slot.task.get().read(); - let task = ManuallyDrop::into_inner(m); + let task = slot.task.get().read().assume_init(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); @@ -1804,7 +1808,8 @@ impl Drop for Injector { if offset < BLOCK_CAP { // Drop the task in the slot. let slot = (*block).slots.get_unchecked(offset); - ManuallyDrop::drop(&mut *(*slot).task.get()); + let p = &mut *slot.task.get(); + p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. let next = (*block).next.load(Ordering::Relaxed); diff --git a/crossbeam-epoch/Cargo.toml b/crossbeam-epoch/Cargo.toml index d76b57935..629eacf12 100644 --- a/crossbeam-epoch/Cargo.toml +++ b/crossbeam-epoch/Cargo.toml @@ -24,6 +24,7 @@ sanitize = [] # Makes it more likely to trigger any potential data races. [dependencies] cfg-if = "0.1.2" +maybe-uninit = "2.0.0" memoffset = "0.5" [dependencies.crossbeam-utils] diff --git a/crossbeam-epoch/src/deferred.rs b/crossbeam-epoch/src/deferred.rs index 3d22ee633..a0970f115 100644 --- a/crossbeam-epoch/src/deferred.rs +++ b/crossbeam-epoch/src/deferred.rs @@ -4,6 +4,8 @@ use core::marker::PhantomData; use core::mem; use core::ptr; +use maybe_uninit::MaybeUninit; + /// Number of words a piece of `Data` can hold. /// /// Three words should be enough for the majority of cases. For example, you can fit inside it the @@ -36,11 +38,8 @@ impl Deferred { unsafe { if size <= mem::size_of::() && align <= mem::align_of::() { - // TODO(taiki-e): when the minimum supported Rust version is bumped to 1.36+, - // replace this with `mem::MaybeUninit`. - #[allow(deprecated)] - let mut data: Data = mem::uninitialized(); - ptr::write(&mut data as *mut Data as *mut F, f); + let mut data = MaybeUninit::::uninit(); + ptr::write(data.as_mut_ptr() as *mut F, f); unsafe fn call(raw: *mut u8) { let f: F = ptr::read(raw as *mut F); @@ -49,16 +48,13 @@ impl Deferred { Deferred { call: call::, - data, + data: data.assume_init(), _marker: PhantomData, } } else { let b: Box = Box::new(f); - // TODO(taiki-e): when the minimum supported Rust version is bumped to 1.36+, - // replace this with `mem::MaybeUninit`. - #[allow(deprecated)] - let mut data: Data = mem::uninitialized(); - ptr::write(&mut data as *mut Data as *mut Box, b); + let mut data = MaybeUninit::::uninit(); + ptr::write(data.as_mut_ptr() as *mut Box, b); unsafe fn call(raw: *mut u8) { let b: Box = ptr::read(raw as *mut Box); @@ -67,7 +63,7 @@ impl Deferred { Deferred { call: call::, - data, + data: data.assume_init(), _marker: PhantomData, } } diff --git a/crossbeam-epoch/src/lib.rs b/crossbeam-epoch/src/lib.rs index 2498f9dd8..282bbe90f 100644 --- a/crossbeam-epoch/src/lib.rs +++ b/crossbeam-epoch/src/lib.rs @@ -64,6 +64,8 @@ extern crate cfg_if; #[cfg(feature = "std")] extern crate core; +extern crate maybe_uninit; + cfg_if! { if #[cfg(feature = "alloc")] { extern crate alloc; diff --git a/crossbeam-epoch/src/sync/queue.rs b/crossbeam-epoch/src/sync/queue.rs index 08584002b..99fb6a1c4 100644 --- a/crossbeam-epoch/src/sync/queue.rs +++ b/crossbeam-epoch/src/sync/queue.rs @@ -8,12 +8,12 @@ //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a //! Practical Lock-Free Queue Algorithm. https://doi.org/10.1007/978-3-540-30232-2_7 -use core::mem::{self, ManuallyDrop}; -use core::ptr; use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crossbeam_utils::CachePadded; +use maybe_uninit::MaybeUninit; + use {unprotected, Atomic, Guard, Owned, Shared}; // The representation here is a singly-linked list, with a sentinel node at the front. In general @@ -25,15 +25,14 @@ pub struct Queue { tail: CachePadded>>, } -#[derive(Debug)] struct Node { /// The slot in which a value of type `T` can be stored. /// - /// The type of `data` is `ManuallyDrop` because a `Node` doesn't always contain a `T`. + /// The type of `data` is `MaybeUninit` because a `Node` doesn't always contain a `T`. /// For example, the sentinel node in a queue never contains a value: its slot is always empty. /// Other nodes start their life with a push operation and contain a value until it gets popped /// out. After that such empty nodes get added to the collector for destruction. - data: ManuallyDrop, + data: MaybeUninit, next: Atomic>, } @@ -49,11 +48,8 @@ impl Queue { head: CachePadded::new(Atomic::null()), tail: CachePadded::new(Atomic::null()), }; - // TODO(taiki-e): when the minimum supported Rust version is bumped to 1.36+, - // replace this with `mem::MaybeUninit`. - #[allow(deprecated)] let sentinel = Owned::new(Node { - data: unsafe { mem::uninitialized() }, + data: MaybeUninit::uninit(), next: Atomic::null(), }); unsafe { @@ -93,7 +89,7 @@ impl Queue { /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. pub fn push(&self, t: T, guard: &Guard) { let new = Owned::new(Node { - data: ManuallyDrop::new(t), + data: MaybeUninit::new(t), next: Atomic::null(), }); let new = Owned::into_shared(new, guard); @@ -126,7 +122,8 @@ impl Queue { let _ = self.tail.compare_and_set(tail, next, Release, guard); } guard.defer_destroy(head); - Some(ManuallyDrop::into_inner(ptr::read(&n.data))) + // TODO: Replace with MaybeUninit::read when api is stable + Some(n.data.as_ptr().read()) }) .map_err(|_| ()) }, @@ -146,7 +143,7 @@ impl Queue { let h = unsafe { head.deref() }; let next = h.next.load(Acquire, guard); match unsafe { next.as_ref() } { - Some(n) if condition(&n.data) => unsafe { + Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { self.head .compare_and_set(head, next, Release, guard) .map(|_| { @@ -156,7 +153,7 @@ impl Queue { let _ = self.tail.compare_and_set(tail, next, Release, guard); } guard.defer_destroy(head); - Some(ManuallyDrop::into_inner(ptr::read(&n.data))) + Some(n.data.as_ptr().read()) }) .map_err(|_| ()) }, diff --git a/crossbeam-queue/Cargo.toml b/crossbeam-queue/Cargo.toml index 0e50c878b..7b5c97920 100644 --- a/crossbeam-queue/Cargo.toml +++ b/crossbeam-queue/Cargo.toml @@ -22,6 +22,7 @@ alloc = ["crossbeam-utils/alloc"] [dependencies] cfg-if = "0.1.2" +maybe-uninit = "2.0.0" [dependencies.crossbeam-utils] version = "0.7" diff --git a/crossbeam-queue/src/array_queue.rs b/crossbeam-queue/src/array_queue.rs index 683daad8b..45b055abd 100644 --- a/crossbeam-queue/src/array_queue.rs +++ b/crossbeam-queue/src/array_queue.rs @@ -18,6 +18,8 @@ use core::sync::atomic::{self, AtomicUsize, Ordering}; use crossbeam_utils::{Backoff, CachePadded}; +use maybe_uninit::MaybeUninit; + use err::{PopError, PushError}; /// A slot in a queue. @@ -29,7 +31,7 @@ struct Slot { stamp: AtomicUsize, /// The value in this slot. - value: UnsafeCell, + value: UnsafeCell>, } /// A bounded multi-producer multi-consumer queue. @@ -187,7 +189,7 @@ impl ArrayQueue { Ok(_) => { // Write the value into the slot and update the stamp. unsafe { - slot.value.get().write(value); + slot.value.get().write(MaybeUninit::new(value)); } slot.stamp.store(tail + 1, Ordering::Release); return Ok(()); @@ -266,7 +268,7 @@ impl ArrayQueue { ) { Ok(_) => { // Read the value from the slot and update the stamp. - let msg = unsafe { slot.value.get().read() }; + let msg = unsafe { slot.value.get().read().assume_init() }; slot.stamp .store(head.wrapping_add(self.one_lap), Ordering::Release); return Ok(msg); @@ -415,7 +417,12 @@ impl Drop for ArrayQueue { }; unsafe { - self.buffer.add(index).drop_in_place(); + let p = { + let slot = &mut *self.buffer.add(index); + let value = &mut *slot.value.get(); + value.as_mut_ptr() + }; + p.drop_in_place(); } } diff --git a/crossbeam-queue/src/lib.rs b/crossbeam-queue/src/lib.rs index 2eda6b0df..fd9939207 100644 --- a/crossbeam-queue/src/lib.rs +++ b/crossbeam-queue/src/lib.rs @@ -17,6 +17,8 @@ extern crate cfg_if; #[cfg(feature = "std")] extern crate core; +extern crate maybe_uninit; + cfg_if! { if #[cfg(feature = "alloc")] { extern crate alloc; diff --git a/crossbeam-queue/src/seg_queue.rs b/crossbeam-queue/src/seg_queue.rs index 2ddfe8b3f..b52da4f5c 100644 --- a/crossbeam-queue/src/seg_queue.rs +++ b/crossbeam-queue/src/seg_queue.rs @@ -2,12 +2,13 @@ use alloc::boxed::Box; use core::cell::UnsafeCell; use core::fmt; use core::marker::PhantomData; -use core::mem::{self, ManuallyDrop}; use core::ptr; use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use crossbeam_utils::{Backoff, CachePadded}; +use maybe_uninit::MaybeUninit; + use err::PopError; // Bits indicating the state of a slot: @@ -30,7 +31,7 @@ const HAS_NEXT: usize = 1; /// A slot in a block. struct Slot { /// The value. - value: UnsafeCell>, + value: UnsafeCell>, /// The state of the slot. state: AtomicUsize, @@ -60,7 +61,13 @@ struct Block { impl Block { /// Creates an empty block that starts at `start_index`. fn new() -> Block { - unsafe { mem::zeroed() } + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + unsafe { MaybeUninit::zeroed().assume_init() } } /// Waits until the next pointer is set. @@ -244,7 +251,7 @@ impl SegQueue { // Write the value into the slot. let slot = (*block).slots.get_unchecked(offset); - slot.value.get().write(ManuallyDrop::new(value)); + slot.value.get().write(MaybeUninit::new(value)); slot.state.fetch_or(WRITE, Ordering::Release); return; @@ -339,8 +346,7 @@ impl SegQueue { // Read the value. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let m = slot.value.get().read(); - let value = ManuallyDrop::into_inner(m); + let value = slot.value.get().read().assume_init(); // Destroy the block if we've reached the end, or if another thread wanted to // destroy but couldn't because we were busy reading from the slot. @@ -451,7 +457,8 @@ impl Drop for SegQueue { if offset < BLOCK_CAP { // Drop the value in the slot. let slot = (*block).slots.get_unchecked(offset); - ManuallyDrop::drop(&mut *(*slot).value.get()); + let p = &mut *slot.value.get(); + p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. let next = (*block).next.load(Ordering::Relaxed);