diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 8988235db..7fa8d1119 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -1506,3 +1506,76 @@ pub(crate) unsafe fn read(r: &Receiver, token: &mut Token) -> Result chan.read(token), } } + +/// Channel flavors that are reconnectable, ie, may survive +/// without senders. +/// +pub mod reconnectable { + // note: we need separate types because the At/Tick/Never + // channel types do not support this, but they all use the + // same exact Receiver type as array and list flavours. + + // The intended usage of this module is just to replace any `use crossbeam_channel::...` + // with `use crossbeam_channel::reconnectable::...` and have everything work out of the box. + + // note: these re-exports omit + // at, tick, never: cannot be supported + // bounded: may be added later in this module + pub use crate::channel::{IntoIter, Iter, TryIter}; + pub use crate::err::{ReadyTimeoutError, SelectTimeoutError, TryReadyError, TrySelectError}; + pub use crate::err::{RecvError, RecvTimeoutError, TryRecvError}; + pub use crate::err::{SendError, SendTimeoutError, TrySendError}; + pub use crate::select::{Select, SelectedOperation}; + pub use crate::Sender; + + use super::Receiver as NormalReceiver; + use super::{ReceiverFlavor, SenderFlavor}; + use std::ops::Deref; + + /// An [unbounded](super::unbounded) channel whose receiver + /// also is reconnectable. + pub fn unbounded() -> (Sender, Receiver) { + let (s, r) = super::unbounded(); + (s, Receiver(r)) + } + + /// A [receiver](super::Receiver) that can survive periods + /// of time where no [Sender]s are alive. New senders may + /// be created from the receiver directly ([Self::new_sender]). + /// The channel is only deallocated when the last receiver dies. + /// If there are live senders at that point, they start producing + /// [SendError]s as usual. + #[derive(Debug)] + pub struct Receiver(NormalReceiver); + + impl Receiver { + /// Returns a new sender that communicates with this + /// receiver. + pub fn new_sender(&self) -> Sender { + match &self.0.flavor { + ReceiverFlavor::Array(chan) => Sender { + flavor: SenderFlavor::Array( + chan.new_sender(|_| unimplemented!("but unreachable")), + ), + }, + ReceiverFlavor::List(chan) => Sender { + flavor: SenderFlavor::List(chan.new_sender(|c| c.reconnect_senders())), + }, + ReceiverFlavor::Zero(chan) => Sender { + flavor: SenderFlavor::Zero( + chan.new_sender(|_| unimplemented!("but unreachable")), + ), + }, + _ => unreachable!("This type cannot be built with at/never/tick"), + } + } + } + + impl Deref for self::Receiver { + type Target = NormalReceiver; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } +} diff --git a/crossbeam-channel/src/counter.rs b/crossbeam-channel/src/counter.rs index 2c27f7c6b..d641c869d 100644 --- a/crossbeam-channel/src/counter.rs +++ b/crossbeam-channel/src/counter.rs @@ -127,6 +127,18 @@ impl Receiver { } } } + + pub(crate) fn new_sender(&self, reconnect: impl FnOnce(&C) -> bool) -> Sender { + if 0 == self.counter().senders.fetch_add(1, Ordering::SeqCst) { + // we're the first sender to be created + reconnect(&self.counter().chan); + self.counter().destroy.store(false, Ordering::Relaxed); + } + + Sender { + counter: self.counter, + } + } } impl ops::Deref for Receiver { diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 5056aa431..5d0852769 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -544,6 +544,17 @@ impl Channel { } } + /// Reconnects senders. There is no need to notify threads + /// as nobody is currently blocking, since we were in an + /// idle phase. + /// + /// Returns `true` if this call reconnected the channel. + pub(crate) fn reconnect_senders(&self) -> bool { + let tail = self.tail.index.fetch_and(!MARK_BIT, Ordering::SeqCst); + + tail & MARK_BIT == 0 + } + /// Disconnects receivers. /// /// Returns `true` if this call disconnected the channel. diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index cc1ef112f..05afa560b 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -361,6 +361,7 @@ cfg_if! { pub use crate::channel::{bounded, unbounded}; pub use crate::channel::{IntoIter, Iter, TryIter}; pub use crate::channel::{Receiver, Sender}; + pub use crate::channel::reconnectable; pub use crate::select::{Select, SelectedOperation}; diff --git a/crossbeam-channel/tests/list.rs b/crossbeam-channel/tests/list.rs index 6673e4b24..38f3b6a4e 100644 --- a/crossbeam-channel/tests/list.rs +++ b/crossbeam-channel/tests/list.rs @@ -6,7 +6,7 @@ use std::sync::atomic::Ordering; use std::thread; use std::time::Duration; -use crossbeam_channel::{select, unbounded, Receiver}; +use crossbeam_channel::{reconnectable, select, unbounded, Receiver}; use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError}; use crossbeam_channel::{SendError, SendTimeoutError, TrySendError}; use crossbeam_utils::thread::scope; @@ -200,6 +200,23 @@ fn recv_after_disconnect() { assert_eq!(r.recv(), Err(RecvError)); } +#[test] +fn zero_receiver_revival() { + let (s, r) = reconnectable::unbounded(); + + s.send(1).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Err(RecvError)); + + let s = r.new_sender(); + + s.send(1).unwrap(); + assert_eq!(r.recv(), Ok(1)); +} + #[test] fn len() { let (s, r) = unbounded();