From d409fc378ca46fca6fb0b0b986b0f11cfc62eb8f Mon Sep 17 00:00:00 2001 From: Corey Farwell Date: Mon, 12 Dec 2016 21:03:08 -0500 Subject: [PATCH 01/10] Rewrite, improve documentation for `core::hash::BuildHasherDefault`. Fixes https://github.com/rust-lang/rust/issues/31242. --- src/libcore/hash/mod.rs | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/src/libcore/hash/mod.rs b/src/libcore/hash/mod.rs index ac36cbaace7a8..18b465d85a12f 100644 --- a/src/libcore/hash/mod.rs +++ b/src/libcore/hash/mod.rs @@ -255,10 +255,44 @@ pub trait BuildHasher { fn build_hasher(&self) -> Self::Hasher; } -/// A structure which implements `BuildHasher` for all `Hasher` types which also -/// implement `Default`. +/// The `BuildHasherDefault` structure is used in scenarios where one has a +/// type that implements [`Hasher`] and [`Default`], but needs that type to +/// implement [`BuildHasher`]. /// -/// This struct is 0-sized and does not need construction. +/// This structure is zero-sized and does not need construction. +/// +/// # Examples +/// +/// Using `BuildHasherDefault` to specify a custom [`BuildHasher`] for +/// [`HashMap`]: +/// +/// ``` +/// use std::collections::HashMap; +/// use std::hash::{BuildHasherDefault, Hasher}; +/// +/// #[derive(Default)] +/// struct MyHasher; +/// +/// impl Hasher for MyHasher { +/// fn write(&mut self, bytes: &[u8]) { +/// // Your hashing algorithm goes here! +/// unimplemented!() +/// } +/// +/// fn finish(&self) -> u64 { +/// // Your hashing algorithm goes here! +/// unimplemented!() +/// } +/// } +/// +/// type MyBuildHasher = BuildHasherDefault; +/// +/// let hash_map = HashMap::::default(); +/// ``` +/// +/// [`BuildHasher`]: trait.BuildHasher.html +/// [`Default`]: ../default/trait.Default.html +/// [`Hasher`]: trait.Hasher.html #[stable(since = "1.7.0", feature = "build_hasher")] pub struct BuildHasherDefault(marker::PhantomData); From e900f6c7fe8c077f3a94500c70eee9cfaaa8eff5 Mon Sep 17 00:00:00 2001 From: Ralph Giles Date: Fri, 16 Dec 2016 10:54:48 -0800 Subject: [PATCH 02/10] rustc: Disable NEON on armv7 android. We thought Google's ABI for arvm7 required neon, but it is currently optional, perhaps because there is a significant population of Tegra 2 devices still in use. This turns off neon code generation outside #[target-feature] blocks just like we do on armv7-unknown-linux-gnu, but unlike most other armv7 targets. LLVM defaults to +neon for this target, so an explicit disable is necessary. See https://developer.android.com/ndk/guides/abis.html#v7a for instruction set extension requirements. Closes #38402. --- src/librustc_back/target/armv7_linux_androideabi.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/librustc_back/target/armv7_linux_androideabi.rs b/src/librustc_back/target/armv7_linux_androideabi.rs index 42f0deaa3fbff..598b961a3048a 100644 --- a/src/librustc_back/target/armv7_linux_androideabi.rs +++ b/src/librustc_back/target/armv7_linux_androideabi.rs @@ -1,4 +1,4 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// Copyright 2016 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // @@ -12,7 +12,7 @@ use target::{Target, TargetOptions, TargetResult}; pub fn target() -> TargetResult { let mut base = super::android_base::opts(); - base.features = "+v7,+thumb2,+vfp3,+d16".to_string(); + base.features = "+v7,+thumb2,+vfp3,+d16,-neon".to_string(); base.max_atomic_width = Some(64); Ok(Target { From 9e01f76349ec358009099dd5dc87b921960defc2 Mon Sep 17 00:00:00 2001 From: Ralph Giles Date: Fri, 16 Dec 2016 11:27:12 -0800 Subject: [PATCH 03/10] rustc: Link to Android ABI requirements. Hopefully these references will be stable and provide guidance when requirements change in the future. --- src/librustc_back/target/aarch64_linux_android.rs | 3 +++ src/librustc_back/target/armv7_linux_androideabi.rs | 3 +++ src/librustc_back/target/i686_linux_android.rs | 3 +++ 3 files changed, 9 insertions(+) diff --git a/src/librustc_back/target/aarch64_linux_android.rs b/src/librustc_back/target/aarch64_linux_android.rs index 140195c780b9c..54eead94986cc 100644 --- a/src/librustc_back/target/aarch64_linux_android.rs +++ b/src/librustc_back/target/aarch64_linux_android.rs @@ -10,6 +10,9 @@ use target::{Target, TargetOptions, TargetResult}; +// See https://developer.android.com/ndk/guides/abis.html#arm64-v8a +// for target ABI requirements. + pub fn target() -> TargetResult { let mut base = super::android_base::opts(); base.max_atomic_width = Some(128); diff --git a/src/librustc_back/target/armv7_linux_androideabi.rs b/src/librustc_back/target/armv7_linux_androideabi.rs index 598b961a3048a..36f409b7948c2 100644 --- a/src/librustc_back/target/armv7_linux_androideabi.rs +++ b/src/librustc_back/target/armv7_linux_androideabi.rs @@ -10,6 +10,9 @@ use target::{Target, TargetOptions, TargetResult}; +// See https://developer.android.com/ndk/guides/abis.html#v7a +// for target ABI requirements. + pub fn target() -> TargetResult { let mut base = super::android_base::opts(); base.features = "+v7,+thumb2,+vfp3,+d16,-neon".to_string(); diff --git a/src/librustc_back/target/i686_linux_android.rs b/src/librustc_back/target/i686_linux_android.rs index a2c007d496960..f8a8f5a3500be 100644 --- a/src/librustc_back/target/i686_linux_android.rs +++ b/src/librustc_back/target/i686_linux_android.rs @@ -10,6 +10,9 @@ use target::{Target, TargetResult}; +// See https://developer.android.com/ndk/guides/abis.html#x86 +// for target ABI requirements. + pub fn target() -> TargetResult { let mut base = super::android_base::opts(); From 26d4308c6acbbde642be4621f5e3487b24ed8bd1 Mon Sep 17 00:00:00 2001 From: Andrew Paseltiner Date: Fri, 16 Dec 2016 18:10:09 -0500 Subject: [PATCH 04/10] Replace invalid use of `&mut` with `UnsafeCell` in `std::sync::mpsc` Closes #36934 --- src/libstd/sync/mpsc/mod.rs | 192 +++++++++------------ src/libstd/sync/mpsc/oneshot.rs | 293 +++++++++++++++++--------------- src/libstd/sync/mpsc/shared.rs | 106 ++++++------ src/libstd/sync/mpsc/stream.rs | 65 +++---- 4 files changed, 328 insertions(+), 328 deletions(-) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 9f51d3e87f3f7..0c3f31455cc41 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -348,7 +348,7 @@ impl !Sync for Sender { } /// owned by one thread, but it can be cloned to send to other threads. #[stable(feature = "rust1", since = "1.0.0")] pub struct SyncSender { - inner: Arc>>, + inner: Arc>, } #[stable(feature = "rust1", since = "1.0.0")] @@ -426,10 +426,10 @@ pub enum TrySendError { } enum Flavor { - Oneshot(Arc>>), - Stream(Arc>>), - Shared(Arc>>), - Sync(Arc>>), + Oneshot(Arc>), + Stream(Arc>), + Shared(Arc>), + Sync(Arc>), } #[doc(hidden)] @@ -487,7 +487,7 @@ impl UnsafeFlavor for Receiver { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn channel() -> (Sender, Receiver) { - let a = Arc::new(UnsafeCell::new(oneshot::Packet::new())); + let a = Arc::new(oneshot::Packet::new()); (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) } @@ -532,7 +532,7 @@ pub fn channel() -> (Sender, Receiver) { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { - let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound))); + let a = Arc::new(sync::Packet::new(bound)); (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) } @@ -578,38 +578,30 @@ impl Sender { pub fn send(&self, t: T) -> Result<(), SendError> { let (new_inner, ret) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - unsafe { - let p = p.get(); - if !(*p).sent() { - return (*p).send(t).map_err(SendError); - } else { - let a = - Arc::new(UnsafeCell::new(stream::Packet::new())); - let rx = Receiver::new(Flavor::Stream(a.clone())); - match (*p).upgrade(rx) { - oneshot::UpSuccess => { - let ret = (*a.get()).send(t); - (a, ret) - } - oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(token) => { - // This send cannot panic because the thread is - // asleep (we're looking at it), so the receiver - // can't go away. - (*a.get()).send(t).ok().unwrap(); - token.signal(); - (a, Ok(())) - } + if !p.sent() { + return p.send(t).map_err(SendError); + } else { + let a = Arc::new(stream::Packet::new()); + let rx = Receiver::new(Flavor::Stream(a.clone())); + match p.upgrade(rx) { + oneshot::UpSuccess => { + let ret = a.send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is + // asleep (we're looking at it), so the receiver + // can't go away. + a.send(t).ok().unwrap(); + token.signal(); + (a, Ok(())) } } } } - Flavor::Stream(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, + Flavor::Stream(ref p) => return p.send(t).map_err(SendError), + Flavor::Shared(ref p) => return p.send(t).map_err(SendError), Flavor::Sync(..) => unreachable!(), }; @@ -624,41 +616,43 @@ impl Sender { #[stable(feature = "rust1", since = "1.0.0")] impl Clone for Sender { fn clone(&self) -> Sender { - let (packet, sleeper, guard) = match *unsafe { self.inner() } { + let packet = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - let a = Arc::new(UnsafeCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + let sleeper = match p.upgrade(rx) { oneshot::UpSuccess | - oneshot::UpDisconnected => (a, None, guard), - oneshot::UpWoke(task) => (a, Some(task), guard) - } + oneshot::UpDisconnected => None, + oneshot::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); } + a } Flavor::Stream(ref p) => { - let a = Arc::new(UnsafeCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + let sleeper = match p.upgrade(rx) { stream::UpSuccess | - stream::UpDisconnected => (a, None, guard), - stream::UpWoke(task) => (a, Some(task), guard), - } + stream::UpDisconnected => None, + stream::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); } + a } Flavor::Shared(ref p) => { - unsafe { (*p.get()).clone_chan(); } + p.clone_chan(); return Sender::new(Flavor::Shared(p.clone())); } Flavor::Sync(..) => unreachable!(), }; unsafe { - (*packet.get()).inherit_blocker(sleeper, guard); - let tmp = Sender::new(Flavor::Shared(packet.clone())); mem::swap(self.inner_mut(), tmp.inner_mut()); } @@ -669,10 +663,10 @@ impl Clone for Sender { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for Sender { fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_chan(), + Flavor::Stream(ref p) => p.drop_chan(), + Flavor::Shared(ref p) => p.drop_chan(), Flavor::Sync(..) => unreachable!(), } } @@ -690,7 +684,7 @@ impl fmt::Debug for Sender { //////////////////////////////////////////////////////////////////////////////// impl SyncSender { - fn new(inner: Arc>>) -> SyncSender { + fn new(inner: Arc>) -> SyncSender { SyncSender { inner: inner } } @@ -710,7 +704,7 @@ impl SyncSender { /// information. #[stable(feature = "rust1", since = "1.0.0")] pub fn send(&self, t: T) -> Result<(), SendError> { - unsafe { (*self.inner.get()).send(t).map_err(SendError) } + self.inner.send(t).map_err(SendError) } /// Attempts to send a value on this channel without blocking. @@ -724,14 +718,14 @@ impl SyncSender { /// receiver has received the data or not if this function is successful. #[stable(feature = "rust1", since = "1.0.0")] pub fn try_send(&self, t: T) -> Result<(), TrySendError> { - unsafe { (*self.inner.get()).try_send(t) } + self.inner.try_send(t) } } #[stable(feature = "rust1", since = "1.0.0")] impl Clone for SyncSender { fn clone(&self) -> SyncSender { - unsafe { (*self.inner.get()).clone_chan(); } + self.inner.clone_chan(); SyncSender::new(self.inner.clone()) } } @@ -739,7 +733,7 @@ impl Clone for SyncSender { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for SyncSender { fn drop(&mut self) { - unsafe { (*self.inner.get()).drop_chan(); } + self.inner.drop_chan(); } } @@ -772,7 +766,7 @@ impl Receiver { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(oneshot::Empty) => return Err(TryRecvError::Empty), Err(oneshot::Disconnected) => { @@ -782,7 +776,7 @@ impl Receiver { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(stream::Empty) => return Err(TryRecvError::Empty), Err(stream::Disconnected) => { @@ -792,7 +786,7 @@ impl Receiver { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(shared::Empty) => return Err(TryRecvError::Empty), Err(shared::Disconnected) => { @@ -801,7 +795,7 @@ impl Receiver { } } Flavor::Sync(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(sync::Empty) => return Err(TryRecvError::Empty), Err(sync::Disconnected) => { @@ -875,7 +869,7 @@ impl Receiver { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(oneshot::Disconnected) => return Err(RecvError), Err(oneshot::Upgraded(rx)) => rx, @@ -883,7 +877,7 @@ impl Receiver { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(stream::Disconnected) => return Err(RecvError), Err(stream::Upgraded(rx)) => rx, @@ -891,15 +885,13 @@ impl Receiver { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(shared::Disconnected) => return Err(RecvError), Err(shared::Empty) => unreachable!(), } } - Flavor::Sync(ref p) => return unsafe { - (*p.get()).recv(None).map_err(|_| RecvError) - } + Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), }; unsafe { mem::swap(self.inner_mut(), new_port.inner_mut()); @@ -952,7 +944,7 @@ impl Receiver { loop { let port_or_empty = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(oneshot::Disconnected) => return Err(Disconnected), Err(oneshot::Upgraded(rx)) => Some(rx), @@ -960,7 +952,7 @@ impl Receiver { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(stream::Disconnected) => return Err(Disconnected), Err(stream::Upgraded(rx)) => Some(rx), @@ -968,14 +960,14 @@ impl Receiver { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(shared::Disconnected) => return Err(Disconnected), Err(shared::Empty) => None, } } Flavor::Sync(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(sync::Disconnected) => return Err(Disconnected), Err(sync::Empty) => None, @@ -1020,23 +1012,19 @@ impl select::Packet for Receiver { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).can_recv() } { + match p.can_recv() { Ok(ret) => return ret, Err(upgrade) => upgrade, } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).can_recv() } { + match p.can_recv() { Ok(ret) => return ret, Err(upgrade) => upgrade, } } - Flavor::Shared(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - Flavor::Sync(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } + Flavor::Shared(ref p) => return p.can_recv(), + Flavor::Sync(ref p) => return p.can_recv(), }; unsafe { mem::swap(self.inner_mut(), @@ -1049,25 +1037,21 @@ impl select::Packet for Receiver { loop { let (t, new_port) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { + match p.start_selection(token) { oneshot::SelSuccess => return Installed, oneshot::SelCanceled => return Abort, oneshot::SelUpgraded(t, rx) => (t, rx), } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { + match p.start_selection(token) { stream::SelSuccess => return Installed, stream::SelCanceled => return Abort, stream::SelUpgraded(t, rx) => (t, rx), } } - Flavor::Shared(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } - Flavor::Sync(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } + Flavor::Shared(ref p) => return p.start_selection(token), + Flavor::Sync(ref p) => return p.start_selection(token), }; token = t; unsafe { @@ -1080,16 +1064,10 @@ impl select::Packet for Receiver { let mut was_upgrade = false; loop { let result = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, - Flavor::Stream(ref p) => unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Flavor::Sync(ref p) => return unsafe { - (*p.get()).abort_selection() - }, + Flavor::Oneshot(ref p) => p.abort_selection(), + Flavor::Stream(ref p) => p.abort_selection(was_upgrade), + Flavor::Shared(ref p) => return p.abort_selection(was_upgrade), + Flavor::Sync(ref p) => return p.abort_selection(), }; let new_port = match result { Ok(b) => return b, Err(p) => p }; was_upgrade = true; @@ -1142,11 +1120,11 @@ impl IntoIterator for Receiver { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for Receiver { fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_port(), + Flavor::Stream(ref p) => p.drop_port(), + Flavor::Shared(ref p) => p.drop_port(), + Flavor::Sync(ref p) => p.drop_port(), } } } diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs index 767e9f96ac8e4..b8e50c9297b64 100644 --- a/src/libstd/sync/mpsc/oneshot.rs +++ b/src/libstd/sync/mpsc/oneshot.rs @@ -39,7 +39,8 @@ use self::MyUpgrade::*; use sync::mpsc::Receiver; use sync::mpsc::blocking::{self, SignalToken}; -use core::mem; +use cell::UnsafeCell; +use ptr; use sync::atomic::{AtomicUsize, Ordering}; use time::Instant; @@ -57,10 +58,10 @@ pub struct Packet { // Internal state of the chan/port pair (stores the blocked thread as well) state: AtomicUsize, // One-shot data slot location - data: Option, + data: UnsafeCell>, // when used for the second time, a oneshot channel must be upgraded, and // this contains the slot for the upgrade - upgrade: MyUpgrade, + upgrade: UnsafeCell>, } pub enum Failure { @@ -90,42 +91,44 @@ enum MyUpgrade { impl Packet { pub fn new() -> Packet { Packet { - data: None, - upgrade: NothingSent, + data: UnsafeCell::new(None), + upgrade: UnsafeCell::new(NothingSent), state: AtomicUsize::new(EMPTY), } } - pub fn send(&mut self, t: T) -> Result<(), T> { - // Sanity check - match self.upgrade { - NothingSent => {} - _ => panic!("sending on a oneshot that's already sent on "), - } - assert!(self.data.is_none()); - self.data = Some(t); - self.upgrade = SendUsed; - - match self.state.swap(DATA, Ordering::SeqCst) { - // Sent the data, no one was waiting - EMPTY => Ok(()), - - // Couldn't send the data, the port hung up first. Return the data - // back up the stack. - DISCONNECTED => { - self.state.swap(DISCONNECTED, Ordering::SeqCst); - self.upgrade = NothingSent; - Err(self.data.take().unwrap()) + pub fn send(&self, t: T) -> Result<(), T> { + unsafe { + // Sanity check + match *self.upgrade.get() { + NothingSent => {} + _ => panic!("sending on a oneshot that's already sent on "), } + assert!((*self.data.get()).is_none()); + ptr::write(self.data.get(), Some(t)); + ptr::write(self.upgrade.get(), SendUsed); + + match self.state.swap(DATA, Ordering::SeqCst) { + // Sent the data, no one was waiting + EMPTY => Ok(()), + + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. + DISCONNECTED => { + self.state.swap(DISCONNECTED, Ordering::SeqCst); + ptr::write(self.upgrade.get(), NothingSent); + Err((&mut *self.data.get()).take().unwrap()) + } - // Not possible, these are one-use channels - DATA => unreachable!(), + // Not possible, these are one-use channels + DATA => unreachable!(), - // There is a thread waiting on the other end. We leave the 'DATA' - // state inside so it'll pick it up on the other end. - ptr => unsafe { - SignalToken::cast_from_usize(ptr).signal(); - Ok(()) + // There is a thread waiting on the other end. We leave the 'DATA' + // state inside so it'll pick it up on the other end. + ptr => { + SignalToken::cast_from_usize(ptr).signal(); + Ok(()) + } } } } @@ -133,13 +136,15 @@ impl Packet { // Just tests whether this channel has been sent on or not, this is only // safe to use from the sender. pub fn sent(&self) -> bool { - match self.upgrade { - NothingSent => false, - _ => true, + unsafe { + match *self.upgrade.get() { + NothingSent => false, + _ => true, + } } } - pub fn recv(&mut self, deadline: Option) -> Result> { + pub fn recv(&self, deadline: Option) -> Result> { // Attempt to not block the thread (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. if self.state.load(Ordering::SeqCst) == EMPTY { @@ -167,73 +172,77 @@ impl Packet { self.try_recv() } - pub fn try_recv(&mut self) -> Result> { - match self.state.load(Ordering::SeqCst) { - EMPTY => Err(Empty), - - // We saw some data on the channel, but the channel can be used - // again to send us an upgrade. As a result, we need to re-insert - // into the channel that there's no data available (otherwise we'll - // just see DATA next time). This is done as a cmpxchg because if - // the state changes under our feet we'd rather just see that state - // change. - DATA => { - self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); - match self.data.take() { - Some(data) => Ok(data), - None => unreachable!(), + pub fn try_recv(&self) -> Result> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => unreachable!(), + } } - } - // There's no guarantee that we receive before an upgrade happens, - // and an upgrade flags the channel as disconnected, so when we see - // this we first need to check if there's data available and *then* - // we go through and process the upgrade. - DISCONNECTED => { - match self.data.take() { - Some(data) => Ok(data), - None => { - match mem::replace(&mut self.upgrade, SendUsed) { - SendUsed | NothingSent => Err(Disconnected), - GoUp(upgrade) => Err(Upgraded(upgrade)) + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => { + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => { + match ptr::replace(self.upgrade.get(), SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)) + } } } } - } - // We are the sole receiver; there cannot be a blocking - // receiver already. - _ => unreachable!() + // We are the sole receiver; there cannot be a blocking + // receiver already. + _ => unreachable!() + } } } // Returns whether the upgrade was completed. If the upgrade wasn't // completed, then the port couldn't get sent to the other half (it will // never receive it). - pub fn upgrade(&mut self, up: Receiver) -> UpgradeResult { - let prev = match self.upgrade { - NothingSent => NothingSent, - SendUsed => SendUsed, - _ => panic!("upgrading again"), - }; - self.upgrade = GoUp(up); - - match self.state.swap(DISCONNECTED, Ordering::SeqCst) { - // If the channel is empty or has data on it, then we're good to go. - // Senders will check the data before the upgrade (in case we - // plastered over the DATA state). - DATA | EMPTY => UpSuccess, - - // If the other end is already disconnected, then we failed the - // upgrade. Be sure to trash the port we were given. - DISCONNECTED => { self.upgrade = prev; UpDisconnected } - - // If someone's waiting, we gotta wake them up - ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) }) + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { + unsafe { + let prev = match *self.upgrade.get() { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => panic!("upgrading again"), + }; + ptr::write(self.upgrade.get(), GoUp(up)); + + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected } + + // If someone's waiting, we gotta wake them up + ptr => UpWoke(SignalToken::cast_from_usize(ptr)) + } } } - pub fn drop_chan(&mut self) { + pub fn drop_chan(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { DATA | DISCONNECTED | EMPTY => {} @@ -244,7 +253,7 @@ impl Packet { } } - pub fn drop_port(&mut self) { + pub fn drop_port(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop @@ -254,7 +263,7 @@ impl Packet { // There's data on the channel, so make sure we destroy it promptly. // This is why not using an arc is a little difficult (need the box // to stay valid while we take the data). - DATA => { self.data.take().unwrap(); } + DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, // We're the only ones that can block on this port _ => unreachable!() @@ -267,62 +276,66 @@ impl Packet { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. - pub fn can_recv(&mut self) -> Result> { - match self.state.load(Ordering::SeqCst) { - EMPTY => Ok(false), // Welp, we tried - DATA => Ok(true), // we have some un-acquired data - DISCONNECTED if self.data.is_some() => Ok(true), // we have data - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => Err(upgrade), - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { self.upgrade = up; Ok(true) } + pub fn can_recv(&self) -> Result> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Ok(false), // Welp, we tried + DATA => Ok(true), // we have some un-acquired data + DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data + DISCONNECTED => { + match ptr::replace(self.upgrade.get(), SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => Err(upgrade), + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { ptr::write(self.upgrade.get(), up); Ok(true) } + } } + _ => unreachable!(), // we're the "one blocker" } - _ => unreachable!(), // we're the "one blocker" } } // Attempts to start selection on this port. This can either succeed, fail // because there is data, or fail because there is an upgrade pending. - pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult { - let ptr = unsafe { token.cast_to_usize() }; - match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { - EMPTY => SelSuccess, - DATA => { - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - SelCanceled - } - DISCONNECTED if self.data.is_some() => { - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - SelCanceled - } - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => { - SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade) - } + pub fn start_selection(&self, token: SignalToken) -> SelectionResult { + unsafe { + let ptr = token.cast_to_usize(); + match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { + EMPTY => SelSuccess, + DATA => { + drop(SignalToken::cast_from_usize(ptr)); + SelCanceled + } + DISCONNECTED if (*self.data.get()).is_some() => { + drop(SignalToken::cast_from_usize(ptr)); + SelCanceled + } + DISCONNECTED => { + match ptr::replace(self.upgrade.get(), SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => { + SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade) + } - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { - self.upgrade = up; - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - SelCanceled + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { + ptr::write(self.upgrade.get(), up); + drop(SignalToken::cast_from_usize(ptr)); + SelCanceled + } } } + _ => unreachable!(), // we're the "one blocker" } - _ => unreachable!(), // we're the "one blocker" } } @@ -330,7 +343,7 @@ impl Packet { // blocked thread will no longer be visible to any other threads. // // The return value indicates whether there's data on this port. - pub fn abort_selection(&mut self) -> Result> { + pub fn abort_selection(&self) -> Result> { let state = match self.state.load(Ordering::SeqCst) { // Each of these states means that no further activity will happen // with regard to abortion selection @@ -356,16 +369,16 @@ impl Packet { // // We then need to check to see if there was an upgrade requested, // and if so, the upgraded port needs to have its selection aborted. - DISCONNECTED => { - if self.data.is_some() { + DISCONNECTED => unsafe { + if (*self.data.get()).is_some() { Ok(true) } else { - match mem::replace(&mut self.upgrade, SendUsed) { + match ptr::replace(self.upgrade.get(), SendUsed) { GoUp(port) => Err(port), _ => Ok(true), } } - } + }, // We woke ourselves up from select. ptr => unsafe { diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs index 2a9618251ff52..f9e0290416432 100644 --- a/src/libstd/sync/mpsc/shared.rs +++ b/src/libstd/sync/mpsc/shared.rs @@ -24,6 +24,8 @@ use core::cmp; use core::intrinsics::abort; use core::isize; +use cell::UnsafeCell; +use ptr; use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering}; use sync::mpsc::blocking::{self, SignalToken}; use sync::mpsc::mpsc_queue as mpsc; @@ -44,7 +46,7 @@ const MAX_STEALS: isize = 1 << 20; pub struct Packet { queue: mpsc::Queue, cnt: AtomicIsize, // How many items are on this channel - steals: isize, // How many times has a port received without blocking? + steals: UnsafeCell, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for wake up // The number of channels which are currently using this packet. @@ -72,7 +74,7 @@ impl Packet { Packet { queue: mpsc::Queue::new(), cnt: AtomicIsize::new(0), - steals: 0, + steals: UnsafeCell::new(0), to_wake: AtomicUsize::new(0), channels: AtomicUsize::new(2), port_dropped: AtomicBool::new(false), @@ -95,7 +97,7 @@ impl Packet { // threads in select(). // // This can only be called at channel-creation time - pub fn inherit_blocker(&mut self, + pub fn inherit_blocker(&self, token: Option, guard: MutexGuard<()>) { token.map(|token| { @@ -122,7 +124,7 @@ impl Packet { // To offset this bad increment, we initially set the steal count to // -1. You'll find some special code in abort_selection() as well to // ensure that this -1 steal count doesn't escape too far. - self.steals = -1; + unsafe { *self.steals.get() = -1; } }); // When the shared packet is constructed, we grabbed this lock. The @@ -133,7 +135,7 @@ impl Packet { drop(guard); } - pub fn send(&mut self, t: T) -> Result<(), T> { + pub fn send(&self, t: T) -> Result<(), T> { // See Port::drop for what's going on if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } @@ -218,7 +220,7 @@ impl Packet { Ok(()) } - pub fn recv(&mut self, deadline: Option) -> Result { + pub fn recv(&self, deadline: Option) -> Result { // This code is essentially the exact same as that found in the stream // case (see stream.rs) match self.try_recv() { @@ -239,37 +241,38 @@ impl Packet { } match self.try_recv() { - data @ Ok(..) => { self.steals -= 1; data } + data @ Ok(..) => unsafe { *self.steals.get() -= 1; data }, data => data, } } // Essentially the exact same thing as the stream decrement function. // Returns true if blocking should proceed. - fn decrement(&mut self, token: SignalToken) -> StartResult { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); - let ptr = unsafe { token.cast_to_usize() }; - self.to_wake.store(ptr, Ordering::SeqCst); - - let steals = self.steals; - self.steals = 0; - - match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } - // If we factor in our steals and notice that the channel has no - // data, we successfully sleep - n => { - assert!(n >= 0); - if n - steals <= 0 { return Installed } + fn decrement(&self, token: SignalToken) -> StartResult { + unsafe { + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + let ptr = token.cast_to_usize(); + self.to_wake.store(ptr, Ordering::SeqCst); + + let steals = ptr::replace(self.steals.get(), 0); + + match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Installed } + } } - } - self.to_wake.store(0, Ordering::SeqCst); - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - Abort + self.to_wake.store(0, Ordering::SeqCst); + drop(SignalToken::cast_from_usize(ptr)); + Abort + } } - pub fn try_recv(&mut self) -> Result { + pub fn try_recv(&self) -> Result { let ret = match self.queue.pop() { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -303,23 +306,23 @@ impl Packet { match ret { // See the discussion in the stream implementation for why we // might decrement steals. - Some(data) => { - if self.steals > MAX_STEALS { + Some(data) => unsafe { + if *self.steals.get() > MAX_STEALS { match self.cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, self.steals); - self.steals -= m; + let m = cmp::min(n, *self.steals.get()); + *self.steals.get() -= m; self.bump(n - m); } } - assert!(self.steals >= 0); + assert!(*self.steals.get() >= 0); } - self.steals += 1; + *self.steals.get() += 1; Ok(data) - } + }, // See the discussion in the stream implementation for why we try // again. @@ -341,7 +344,7 @@ impl Packet { // Prepares this shared packet for a channel clone, essentially just bumping // a refcount. - pub fn clone_chan(&mut self) { + pub fn clone_chan(&self) { let old_count = self.channels.fetch_add(1, Ordering::SeqCst); // See comments on Arc::clone() on why we do this (for `mem::forget`). @@ -355,7 +358,7 @@ impl Packet { // Decrement the reference count on a channel. This is called whenever a // Chan is dropped and may end up waking up a receiver. It's the receiver's // responsibility on the other end to figure out that we've disconnected. - pub fn drop_chan(&mut self) { + pub fn drop_chan(&self) { match self.channels.fetch_sub(1, Ordering::SeqCst) { 1 => {} n if n > 1 => return, @@ -371,9 +374,9 @@ impl Packet { // See the long discussion inside of stream.rs for why the queue is drained, // and why it is done in this fashion. - pub fn drop_port(&mut self) { + pub fn drop_port(&self) { self.port_dropped.store(true, Ordering::SeqCst); - let mut steals = self.steals; + let mut steals = unsafe { *self.steals.get() }; while { let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals @@ -390,7 +393,7 @@ impl Packet { } // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> SignalToken { + fn take_to_wake(&self) -> SignalToken { let ptr = self.to_wake.load(Ordering::SeqCst); self.to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); @@ -406,13 +409,13 @@ impl Packet { // // This is different than the stream version because there's no need to peek // at the queue, we can just look at the local count. - pub fn can_recv(&mut self) -> bool { + pub fn can_recv(&self) -> bool { let cnt = self.cnt.load(Ordering::SeqCst); - cnt == DISCONNECTED || cnt - self.steals > 0 + cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0 } // increment the count on the channel (used for selection) - fn bump(&mut self, amt: isize) -> isize { + fn bump(&self, amt: isize) -> isize { match self.cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); @@ -427,7 +430,7 @@ impl Packet { // // The code here is the same as in stream.rs, except that it doesn't need to // peek at the channel to see if an upgrade is pending. - pub fn start_selection(&mut self, token: SignalToken) -> StartResult { + pub fn start_selection(&self, token: SignalToken) -> StartResult { match self.decrement(token) { Installed => Installed, Abort => { @@ -443,7 +446,7 @@ impl Packet { // // This is similar to the stream implementation (hence fewer comments), but // uses a different value for the "steals" variable. - pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { + pub fn abort_selection(&self, _was_upgrade: bool) -> bool { // Before we do anything else, we bounce on this lock. The reason for // doing this is to ensure that any upgrade-in-progress is gone and // done with. Without this bounce, we can race with inherit_blocker @@ -477,12 +480,15 @@ impl Packet { thread::yield_now(); } } - // if the number of steals is -1, it was the pre-emptive -1 steal - // count from when we inherited a blocker. This is fine because - // we're just going to overwrite it with a real value. - assert!(self.steals == 0 || self.steals == -1); - self.steals = steals; - prev >= 0 + unsafe { + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + let old = self.steals.get(); + assert!(*old == 0 || *old == -1); + *old = steals; + prev >= 0 + } } } } diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs index 61c8316467d9a..47cd8977fda23 100644 --- a/src/libstd/sync/mpsc/stream.rs +++ b/src/libstd/sync/mpsc/stream.rs @@ -22,8 +22,10 @@ pub use self::UpgradeResult::*; pub use self::SelectionResult::*; use self::Message::*; +use cell::UnsafeCell; use core::cmp; use core::isize; +use ptr; use thread; use time::Instant; @@ -42,7 +44,7 @@ pub struct Packet { queue: spsc::Queue>, // internal queue for all message cnt: AtomicIsize, // How many items are on this channel - steals: isize, // How many times has a port received without blocking? + steals: UnsafeCell, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. @@ -79,14 +81,14 @@ impl Packet { queue: unsafe { spsc::Queue::new(128) }, cnt: AtomicIsize::new(0), - steals: 0, + steals: UnsafeCell::new(0), to_wake: AtomicUsize::new(0), port_dropped: AtomicBool::new(false), } } - pub fn send(&mut self, t: T) -> Result<(), T> { + pub fn send(&self, t: T) -> Result<(), T> { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. @@ -99,7 +101,7 @@ impl Packet { Ok(()) } - pub fn upgrade(&mut self, up: Receiver) -> UpgradeResult { + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } @@ -107,7 +109,7 @@ impl Packet { self.do_send(GoUp(up)) } - fn do_send(&mut self, t: Message) -> UpgradeResult { + fn do_send(&self, t: Message) -> UpgradeResult { self.queue.push(t); match self.cnt.fetch_add(1, Ordering::SeqCst) { // As described in the mod's doc comment, -1 == wakeup @@ -141,7 +143,7 @@ impl Packet { } // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> SignalToken { + fn take_to_wake(&self) -> SignalToken { let ptr = self.to_wake.load(Ordering::SeqCst); self.to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); @@ -151,13 +153,12 @@ impl Packet { // Decrements the count on the channel for a sleeper, returning the sleeper // back if it shouldn't sleep. Note that this is the location where we take // steals into account. - fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> { + fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_usize() }; self.to_wake.store(ptr, Ordering::SeqCst); - let steals = self.steals; - self.steals = 0; + let steals = unsafe { ptr::replace(self.steals.get(), 0) }; match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } @@ -173,7 +174,7 @@ impl Packet { Err(unsafe { SignalToken::cast_from_usize(ptr) }) } - pub fn recv(&mut self, deadline: Option) -> Result> { + pub fn recv(&self, deadline: Option) -> Result> { // Optimistic preflight check (scheduling is expensive). match self.try_recv() { Err(Empty) => {} @@ -199,16 +200,16 @@ impl Packet { // a steal, so offset the decrement here (we already have our // "steal" factored into the channel count above). data @ Ok(..) | - data @ Err(Upgraded(..)) => { - self.steals -= 1; + data @ Err(Upgraded(..)) => unsafe { + *self.steals.get() -= 1; data - } + }, data => data, } } - pub fn try_recv(&mut self) -> Result> { + pub fn try_recv(&self) -> Result> { match self.queue.pop() { // If we stole some data, record to that effect (this will be // factored into cnt later on). @@ -221,26 +222,26 @@ impl Packet { // a pretty slow operation, of swapping 0 into cnt, taking steals // down as much as possible (without going negative), and then // adding back in whatever we couldn't factor into steals. - Some(data) => { - if self.steals > MAX_STEALS { + Some(data) => unsafe { + if *self.steals.get() > MAX_STEALS { match self.cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, self.steals); - self.steals -= m; + let m = cmp::min(n, *self.steals.get()); + *self.steals.get() -= m; self.bump(n - m); } } - assert!(self.steals >= 0); + assert!(*self.steals.get() >= 0); } - self.steals += 1; + *self.steals.get() += 1; match data { Data(t) => Ok(t), GoUp(up) => Err(Upgraded(up)), } - } + }, None => { match self.cnt.load(Ordering::SeqCst) { @@ -269,7 +270,7 @@ impl Packet { } } - pub fn drop_chan(&mut self) { + pub fn drop_chan(&self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { @@ -279,7 +280,7 @@ impl Packet { } } - pub fn drop_port(&mut self) { + pub fn drop_port(&self) { // Dropping a port seems like a fairly trivial thing. In theory all we // need to do is flag that we're disconnected and then everything else // can take over (we don't have anyone to wake up). @@ -309,7 +310,7 @@ impl Packet { // continue to fail while active senders send data while we're dropping // data, but eventually we're guaranteed to break out of this loop // (because there is a bounded number of senders). - let mut steals = self.steals; + let mut steals = unsafe { *self.steals.get() }; while { let cnt = self.cnt.compare_and_swap( steals, DISCONNECTED, Ordering::SeqCst); @@ -332,7 +333,7 @@ impl Packet { // Tests to see whether this port can receive without blocking. If Ok is // returned, then that's the answer. If Err is returned, then the returned // port needs to be queried instead (an upgrade happened) - pub fn can_recv(&mut self) -> Result> { + pub fn can_recv(&self) -> Result> { // We peek at the queue to see if there's anything on it, and we use // this return value to determine if we should pop from the queue and // upgrade this channel immediately. If it looks like we've got an @@ -351,7 +352,7 @@ impl Packet { } // increment the count on the channel (used for selection) - fn bump(&mut self, amt: isize) -> isize { + fn bump(&self, amt: isize) -> isize { match self.cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); @@ -363,7 +364,7 @@ impl Packet { // Attempts to start selecting on this port. Like a oneshot, this can fail // immediately because of an upgrade. - pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult { + pub fn start_selection(&self, token: SignalToken) -> SelectionResult { match self.decrement(token) { Ok(()) => SelSuccess, Err(token) => { @@ -387,7 +388,7 @@ impl Packet { } // Removes a previous thread from being blocked in this port - pub fn abort_selection(&mut self, + pub fn abort_selection(&self, was_upgrade: bool) -> Result> { // If we're aborting selection after upgrading from a oneshot, then // we're guarantee that no one is waiting. The only way that we could @@ -403,7 +404,7 @@ impl Packet { // this end. This is fine because we know it's a small bounded windows // of time until the data is actually sent. if was_upgrade { - assert_eq!(self.steals, 0); + assert_eq!(unsafe { *self.steals.get() }, 0); assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); return Ok(true) } @@ -444,8 +445,10 @@ impl Packet { thread::yield_now(); } } - assert_eq!(self.steals, 0); - self.steals = steals; + unsafe { + assert_eq!(*self.steals.get(), 0); + *self.steals.get() = steals; + } // if we were previously positive, then there's surely data to // receive From ca376049ca8204fa51c36dc7d4bf4991e972660c Mon Sep 17 00:00:00 2001 From: "Zack M. Davis" Date: Fri, 16 Dec 2016 17:45:59 -0800 Subject: [PATCH 05/10] tidy features: use 2-parameter form of internal try macro for open err This tiny patch merely applies @bluss's suggestion for how to get a more informative error message when the feature check can't open a file, a matter that had briefly annoyed the present author, leading to the filing of #38417. --- src/tools/tidy/src/features.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tools/tidy/src/features.rs b/src/tools/tidy/src/features.rs index 4ef07f7e4b896..ac5dff0980c08 100644 --- a/src/tools/tidy/src/features.rs +++ b/src/tools/tidy/src/features.rs @@ -67,7 +67,7 @@ pub fn check(path: &Path, bad: &mut bool) { } contents.truncate(0); - t!(t!(File::open(file)).read_to_string(&mut contents)); + t!(t!(File::open(&file), &file).read_to_string(&mut contents)); for (i, line) in contents.lines().enumerate() { let mut err = |msg: &str| { From c3423a6640090c58557f9cb86e22c50ebb67ca87 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 17 Dec 2016 13:47:31 -0800 Subject: [PATCH 06/10] Update beta bootstrap compiler The current beta that rustc is bootstrapping from contains a bug in Cargo that erroneously links to OpenSSL in /usr/local, but this is fixed in the most recent 1.14 beta, so let's use that. --- src/stage0.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stage0.txt b/src/stage0.txt index 223974186b1df..2048f711c21a4 100644 --- a/src/stage0.txt +++ b/src/stage0.txt @@ -12,5 +12,5 @@ # tarball for a stable release you'll likely see `1.x.0-$date` where `1.x.0` was # released on `$date` -rustc: beta-2016-11-16 +rustc: beta-2016-12-16 cargo: nightly-2016-11-16 From 1e7bc907086c7eb5415f020ecde3fe7020d5b360 Mon Sep 17 00:00:00 2001 From: est31 Date: Sun, 18 Dec 2016 03:12:18 +0100 Subject: [PATCH 07/10] stage0.txt: typo fix --- src/stage0.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stage0.txt b/src/stage0.txt index 223974186b1df..96568091acbc7 100644 --- a/src/stage0.txt +++ b/src/stage0.txt @@ -2,7 +2,7 @@ # compiler itself. For the rustbuild build system, this also describes the # relevant Cargo revision that we're using. # -# Currently Rust always bootstrap from the previous stable release, and in our +# Currently Rust always bootstraps from the previous stable release, and in our # train model this means that the master branch bootstraps from beta, beta # bootstraps from current stable, and stable bootstraps from the previous stable # release. From 79e8a70b62f15ca56489a751cd3184f8acefcae9 Mon Sep 17 00:00:00 2001 From: Guillaume Gomez Date: Sat, 17 Dec 2016 18:09:05 +0100 Subject: [PATCH 08/10] Add missing urls for thread doc module --- src/libstd/thread/mod.rs | 62 ++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index 255cd2a9bc0f1..55adc3dabf40f 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -17,13 +17,11 @@ //! provide some built-in support for low-level synchronization. //! //! Communication between threads can be done through -//! [channels](../../std/sync/mpsc/index.html), Rust's message-passing -//! types, along with [other forms of thread +//! [channels], Rust's message-passing types, along with [other forms of thread //! synchronization](../../std/sync/index.html) and shared-memory data //! structures. In particular, types that are guaranteed to be //! threadsafe are easily shared between threads using the -//! atomically-reference-counted container, -//! [`Arc`](../../std/sync/struct.Arc.html). +//! atomically-reference-counted container, [`Arc`]. //! //! Fatal logic errors in Rust cause *thread panic*, during which //! a thread will unwind the stack, running destructors and freeing @@ -40,7 +38,7 @@ //! //! ## Spawning a thread //! -//! A new thread can be spawned using the `thread::spawn` function: +//! A new thread can be spawned using the [`thread::spawn`][`spawn`] function: //! //! ```rust //! use std::thread; @@ -55,7 +53,7 @@ //! it), unless this parent is the main thread. //! //! The parent thread can also wait on the completion of the child -//! thread; a call to `spawn` produces a `JoinHandle`, which provides +//! thread; a call to [`spawn`] produces a [`JoinHandle`], which provides //! a `join` method for waiting: //! //! ```rust @@ -68,13 +66,13 @@ //! let res = child.join(); //! ``` //! -//! The `join` method returns a `Result` containing `Ok` of the final -//! value produced by the child thread, or `Err` of the value given to -//! a call to `panic!` if the child panicked. +//! The [`join`] method returns a [`Result`] containing [`Ok`] of the final +//! value produced by the child thread, or [`Err`] of the value given to +//! a call to [`panic!`] if the child panicked. //! //! ## Configuring threads //! -//! A new thread can be configured before it is spawned via the `Builder` type, +//! A new thread can be configured before it is spawned via the [`Builder`] type, //! which currently allows you to set the name and stack size for the child thread: //! //! ```rust @@ -88,43 +86,43 @@ //! //! ## The `Thread` type //! -//! Threads are represented via the `Thread` type, which you can get in one of +//! Threads are represented via the [`Thread`] type, which you can get in one of //! two ways: //! -//! * By spawning a new thread, e.g. using the `thread::spawn` function, and -//! calling `thread()` on the `JoinHandle`. -//! * By requesting the current thread, using the `thread::current` function. +//! * By spawning a new thread, e.g. using the [`thread::spawn`][`spawn`] +//! function, and calling [`thread()`] on the [`JoinHandle`]. +//! * By requesting the current thread, using the [`thread::current()`] function. //! -//! The `thread::current()` function is available even for threads not spawned +//! The [`thread::current()`] function is available even for threads not spawned //! by the APIs of this module. //! //! ## Blocking support: park and unpark //! //! Every thread is equipped with some basic low-level blocking support, via the -//! `thread::park()` function and `thread::Thread::unpark()` method. `park()` -//! blocks the current thread, which can then be resumed from another thread by -//! calling the `unpark()` method on the blocked thread's handle. +//! [`thread::park()`][`park()`] function and [`thread::Thread::unpark()`][`unpark()`] +//! method. [`park()`] blocks the current thread, which can then be resumed from +//! another thread by calling the [`unpark()`] method on the blocked thread's handle. //! -//! Conceptually, each `Thread` handle has an associated token, which is +//! Conceptually, each [`Thread`] handle has an associated token, which is //! initially not present: //! -//! * The `thread::park()` function blocks the current thread unless or until +//! * The [`thread::park()`][`park()`] function blocks the current thread unless or until //! the token is available for its thread handle, at which point it atomically //! consumes the token. It may also return *spuriously*, without consuming the -//! token. `thread::park_timeout()` does the same, but allows specifying a +//! token. [`thread::park_timeout()`] does the same, but allows specifying a //! maximum time to block the thread for. //! -//! * The `unpark()` method on a `Thread` atomically makes the token available +//! * The [`unpark()`] method on a [`Thread`] atomically makes the token available //! if it wasn't already. //! -//! In other words, each `Thread` acts a bit like a semaphore with initial count +//! In other words, each [`Thread`] acts a bit like a semaphore with initial count //! 0, except that the semaphore is *saturating* (the count cannot go above 1), //! and can return spuriously. //! //! The API is typically used by acquiring a handle to the current thread, //! placing that handle in a shared data structure so that other threads can //! find it, and then `park`ing. When some desired condition is met, another -//! thread calls `unpark` on the handle. +//! thread calls [`unpark()`] on the handle. //! //! The motivation for this design is twofold: //! @@ -149,6 +147,22 @@ //! will want to make use of some form of **interior mutability** through the //! [`Cell`] or [`RefCell`] types. //! +//! [channels]: ../../std/sync/mpsc/index.html +//! [`Arc`]: ../../std/sync/struct.Arc.html +//! [`spawn`]: ../../std/thread/fn.spawn.html +//! [`JoinHandle`]: ../../std/thread/struct.JoinHandle.html +//! [`thread()`]: ../../std/thread/struct.JoinHandle.html#method.thread +//! [`join`]: ../../std/thread/struct.JoinHandle.html#method.join +//! [`Result`]: ../../std/result/enum.Result.html +//! [`Ok`]: ../../std/result/enum.Result.html#variant.Ok +//! [`Err`]: ../../std/result/enum.Result.html#variant.Err +//! [`panic!`]: ../../std/macro.panic.html +//! [`Builder`]: ../../std/thread/struct.Builder.html +//! [`thread::current()`]: ../../std/thread/fn.spawn.html +//! [`Thread`]: ../../std/thread/struct.Thread.html +//! [`park()`]: ../../std/thread/fn.park.html +//! [`unpark()`]: ../../std/thread/struct.Thread.html#method.unpark +//! [`thread::park_timeout()`]: ../../std/thread/fn.park_timeout.html //! [`Cell`]: ../cell/struct.Cell.html //! [`RefCell`]: ../cell/struct.RefCell.html //! [`thread_local!`]: ../macro.thread_local.html From 4d392d355e49a9fd309765974d7a9d831e57172c Mon Sep 17 00:00:00 2001 From: Corey Farwell Date: Thu, 15 Dec 2016 16:07:37 -0500 Subject: [PATCH 09/10] Document platform-specific differences for `std::process::exit`. Fixes https://github.com/rust-lang/rust/issues/35046. --- src/libstd/process.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/libstd/process.rs b/src/libstd/process.rs index 858537dd2de12..2dcb8c2f1529c 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -828,6 +828,12 @@ impl Child { /// this function at a known point where there are no more destructors left /// to run. /// +/// ## Platform-specific behavior +/// +/// **Unix**: On Unix-like platforms, it is unlikely that all 32 bits of `exit` +/// will be visible to a parent process inspecting the exit code. On most +/// Unix-like platforms, only the eight least-significant bits are considered. +/// /// # Examples /// /// ``` @@ -835,6 +841,17 @@ impl Child { /// /// process::exit(0); /// ``` +/// +/// Due to [platform-specific behavior], the exit code for this example will be +/// `0` on Linux, but `256` on Windows: +/// +/// ```no_run +/// use std::process; +/// +/// process::exit(0x0f00); +/// ``` +/// +/// [platform-specific behavior]: #platform-specific-behavior #[stable(feature = "rust1", since = "1.0.0")] pub fn exit(code: i32) -> ! { ::sys_common::cleanup(); From b3b2f1b0d626e79c0a4dd8650042fb805b79a6c4 Mon Sep 17 00:00:00 2001 From: Simonas Kazlauskas Date: Sun, 18 Dec 2016 23:45:39 +0200 Subject: [PATCH 10/10] Use exec for the wrapper on UNIXes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This not only avoids the small – and unnecessary – constant overhead for each compiler invocation, but also helps somewhat by only having “correct” rustc processes to look for in `/proc/`. This also makes the wrapper behave effectively as a regular exec wrapper its intended to be. --- src/bootstrap/bin/rustc.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/bootstrap/bin/rustc.rs b/src/bootstrap/bin/rustc.rs index 20ff9d9af3c0e..c5684e69994ea 100644 --- a/src/bootstrap/bin/rustc.rs +++ b/src/bootstrap/bin/rustc.rs @@ -30,7 +30,7 @@ extern crate bootstrap; use std::env; use std::ffi::OsString; use std::path::PathBuf; -use std::process::Command; +use std::process::{Command, ExitStatus}; fn main() { let args = env::args_os().skip(1).collect::>(); @@ -180,8 +180,19 @@ fn main() { } // Actually run the compiler! - std::process::exit(match cmd.status() { - Ok(s) => s.code().unwrap_or(1), + std::process::exit(match exec_cmd(&mut cmd) { + Ok(s) => s.code().unwrap_or(0xfe), Err(e) => panic!("\n\nfailed to run {:?}: {}\n\n", cmd, e), }) } + +#[cfg(unix)] +fn exec_cmd(cmd: &mut Command) -> ::std::io::Result { + use std::os::unix::process::CommandExt; + Err(cmd.exec()) +} + +#[cfg(not(unix))] +fn exec_cmd(cmd: &mut Command) -> ::std::io::Result { + cmd.status() +}