Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inline Sender into Address #141

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 56 additions & 24 deletions src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use event_listener::EventListener;
use futures_sink::Sink;
use futures_util::FutureExt;

use crate::inbox::Chan;
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::send_future::ResolveToHandlerReturn;
use crate::{inbox, BroadcastFuture, Error, Handler, NameableSending, SendFuture};
use crate::{BroadcastFuture, Error, Handler, NameableSending, SendFuture};

/// An [`Address`] is a reference to an actor through which messages can be
/// sent. It can be cloned to create more addresses to the same actor.
Expand Down Expand Up @@ -62,11 +64,19 @@ use crate::{inbox, BroadcastFuture, Error, Handler, NameableSending, SendFuture}
/// of their priority. All actors must handle a message for it to be removed from the mailbox and
/// the length to decrease. This means that the backpressure provided by [`Address::broadcast`] will
/// wait for the slowest actor.
pub struct Address<A, Rc: RefCounter = Strong>(pub(crate) inbox::Sender<A, Rc>);
pub struct Address<A, Rc: RefCounter = Strong> {
pub(crate) inner: Arc<Chan<A>>,
pub(crate) rc: Rc,
}

impl<A, Rc: RefCounter> Debug for Address<A, Rc> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_tuple("Address").field(&self.0).finish()
let act = std::any::type_name::<A>();
f.debug_struct(&format!("Address<{}>", act))
.field("rx_count", &self.inner.receiver_count())
.field("tx_count", &self.inner.sender_count())
.field("rc", &self.rc)
.finish()
}
}

Expand All @@ -82,15 +92,21 @@ impl<A> Address<A, Strong> {
/// an actor will not be prevented from being dropped if only weak sinks, channels, and
/// addresses exist.
pub fn downgrade(&self) -> WeakAddress<A> {
Address(self.0.downgrade())
Address {
inner: self.inner.clone(),
rc: Weak::new(&self.inner),
}
}
}

/// Functions which apply only to addresses which can either be strong or weak.
impl<A> Address<A, Either> {
/// Converts this address into a weak address.
pub fn downgrade(&self) -> WeakAddress<A> {
Address(self.0.downgrade())
Address {
inner: self.inner.clone(),
rc: Weak::new(&self.inner),
}
}
}

Expand Down Expand Up @@ -124,19 +140,19 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
/// })
/// ```
pub fn is_connected(&self) -> bool {
self.0.is_connected()
self.inner.is_connected()
}

/// Returns the number of messages in the actor's mailbox. This will be the sum of broadcast
/// messages, priority messages, and ordered messages. It can be up to three times the capacity,
/// as the capacity is for each send type (broadcast, priority, and ordered).
pub fn len(&self) -> usize {
self.0.len()
self.inner.len()
}

/// The capacity of the actor's mailbox per send type (broadcast, priority, and ordered).
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
self.inner.capacity()
}

/// Returns whether the actor's mailbox is empty.
Expand All @@ -146,7 +162,10 @@ impl<A, Rc: RefCounter> Address<A, Rc> {

/// Convert this address into a generic address which can be weak or strong.
pub fn as_either(&self) -> Address<A, Either> {
Address(self.0.clone().into_either_rc())
Address {
inner: self.inner.clone(),
rc: self.rc.increment(&self.inner).into_either(),
}
}

/// Send a message to the actor. The message will, by default, have a priority of 0 and be sent
Expand All @@ -162,39 +181,39 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
message: M,
) -> SendFuture<
<A as Handler<M>>::Return,
NameableSending<A, <A as Handler<M>>::Return, Rc>,
NameableSending<A, <A as Handler<M>>::Return>,
ResolveToHandlerReturn,
>
where
M: Send + 'static,
A: Handler<M>,
{
SendFuture::sending_named(message, self.0.clone())
SendFuture::sending_named(message, self.inner.clone())
}

/// Send a message to all actors on this address.
///
/// For details, please see the documentation on [`BroadcastFuture`].
pub fn broadcast<M>(&self, msg: M) -> BroadcastFuture<A, M, Rc>
pub fn broadcast<M>(&self, msg: M) -> BroadcastFuture<A, M>
where
M: Clone + Sync + Send + 'static,
A: Handler<M, Return = ()>,
{
BroadcastFuture::new(msg, self.0.clone())
BroadcastFuture::new(msg, self.inner.clone())
}

/// Waits until this address becomes disconnected. Note that if this is called on a strong
/// address, it will only ever trigger if the actor calls [`Context::stop_self`](crate::Context::stop_self),
/// as the address would prevent the actor being dropped due to too few strong addresses.
pub fn join(&self) -> ActorJoinHandle {
ActorJoinHandle(self.0.disconnect_notice())
ActorJoinHandle(self.inner.disconnect_listener())
}

/// Returns true if this address and the other address point to the same actor. This is
/// distinct from the implementation of `PartialEq` as it ignores reference count type, which
/// must be the same for `PartialEq` to return `true`.
pub fn same_actor<Rc2: RefCounter>(&self, other: &Address<A, Rc2>) -> bool {
self.0.inner_ptr() == other.0.inner_ptr()
Arc::ptr_eq(&self.inner, &other.inner)
}

/// Converts this address into a sink that can be used to send messages to the actor. These
Expand Down Expand Up @@ -246,7 +265,10 @@ impl Future for ActorJoinHandle {
// Required because #[derive] adds an A: Clone bound
impl<A, Rc: RefCounter> Clone for Address<A, Rc> {
fn clone(&self) -> Self {
Address(self.0.clone())
Address {
inner: self.inner.clone(),
rc: self.rc.increment(&self.inner),
}
}
}

Expand All @@ -257,7 +279,7 @@ impl<A, Rc: RefCounter> Clone for Address<A, Rc> {
/// it wraps.
impl<A, Rc: RefCounter, Rc2: RefCounter> PartialEq<Address<A, Rc2>> for Address<A, Rc> {
fn eq(&self, other: &Address<A, Rc2>) -> bool {
(self.same_actor(other)) && (self.0.is_strong() == other.0.is_strong())
(self.same_actor(other)) && (self.rc.is_strong() == other.rc.is_strong())
}
}

Expand All @@ -269,23 +291,33 @@ impl<A, Rc: RefCounter> Eq for Address<A, Rc> {}
/// address will compare as greater than a weak one.
impl<A, Rc: RefCounter, Rc2: RefCounter> PartialOrd<Address<A, Rc2>> for Address<A, Rc> {
fn partial_cmp(&self, other: &Address<A, Rc2>) -> Option<Ordering> {
Some(match self.0.inner_ptr().cmp(&other.0.inner_ptr()) {
Ordering::Equal => self.0.is_strong().cmp(&other.0.is_strong()),
ord => ord,
})
Some(
match Arc::as_ptr(&self.inner).cmp(&Arc::as_ptr(&other.inner)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check, does this properly compare data ptrs and not vtables?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner

@Restioson Restioson Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ptr::eq has this same issue, as far as I know. as_ptr might explicitly return the data ptr, but it might also return *const dyn _, which would not be enough to ensure data equality is being tested. See rust-lang/rust#80505 for more

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I guess this also needs to be changed then.

Ordering::Equal => self.rc.is_strong().cmp(&other.rc.is_strong()),
ord => ord,
},
)
}
}

impl<A, Rc: RefCounter> Ord for Address<A, Rc> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.inner_ptr().cmp(&other.0.inner_ptr())
Arc::as_ptr(&self.inner).cmp(&Arc::as_ptr(&other.inner))
}
}

impl<A, Rc: RefCounter> Hash for Address<A, Rc> {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_usize(self.0.inner_ptr() as *const _ as usize);
state.write_u8(self.0.is_strong() as u8);
state.write_usize(Arc::as_ptr(&self.inner) as usize);
state.write_u8(self.rc.is_strong() as u8);
state.finish();
}
}

impl<A, Rc: RefCounter> Drop for Address<A, Rc> {
fn drop(&mut self) {
if self.rc.decrement(&self.inner) {
self.inner.shutdown_waiting_receivers()
}
}
}
44 changes: 21 additions & 23 deletions src/broadcast_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use futures_core::FusedFuture;
use futures_util::FutureExt;

use crate::envelope::BroadcastEnvelopeConcrete;
use crate::inbox::tx::TxRefCounter;
use crate::inbox::{SendFuture, SentMessage};
use crate::{inbox, Error, Handler};
use crate::inbox::{Chan, SendFuture, SentMessage};
use crate::{Error, Handler};

/// A [`Future`] that represents the state of broadcasting a message to all actors connected to an
/// [`Address`](crate::Address).
Expand All @@ -23,19 +22,16 @@ use crate::{inbox, Error, Handler};
/// case the mailbox of an actor is bounded, this future yields `Pending` until a slot for this
/// message is available.
#[must_use = "Futures do nothing unless polled"]
pub struct BroadcastFuture<A, M, Rc: TxRefCounter> {
inner: Inner<A, M, Rc>,
pub struct BroadcastFuture<A, M> {
inner: Inner<A, M>,
}

impl<A, M, Rc> BroadcastFuture<A, M, Rc>
where
Rc: TxRefCounter,
{
pub(crate) fn new(message: M, sender: inbox::Sender<A, Rc>) -> Self {
impl<A, M> BroadcastFuture<A, M> {
pub(crate) fn new(message: M, chan: Arc<Chan<A>>) -> Self {
Self {
inner: Inner::Initial {
message,
sender,
chan,
priority: None,
},
}
Expand All @@ -47,11 +43,13 @@ where
pub fn priority(self, priority: u32) -> Self {
match self.inner {
Inner::Initial {
message, sender, ..
message,
chan: sender,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Remove this extra binding.

..
} => Self {
inner: Inner::Initial {
message,
sender,
chan: sender,
priority: Some(priority),
},
},
Expand All @@ -60,19 +58,18 @@ where
}
}

enum Inner<A, M, Rc: TxRefCounter> {
enum Inner<A, M> {
Initial {
message: M,
sender: inbox::Sender<A, Rc>,
chan: Arc<Chan<A>>,
priority: Option<u32>,
},
Sending(SendFuture<A, Rc>),
Sending(SendFuture<A>),
Done,
}

impl<A, M, Rc> Future for BroadcastFuture<A, M, Rc>
impl<A, M> Future for BroadcastFuture<A, M>
where
Rc: TxRefCounter,
M: Clone + Send + Sync + 'static + Unpin,
A: Handler<M, Return = ()>,
{
Expand All @@ -84,13 +81,15 @@ where
match mem::replace(&mut this.inner, Inner::Done) {
Inner::Initial {
message,
sender,
chan,
priority,
} => {
let envelope =
BroadcastEnvelopeConcrete::<A, M>::new(message, priority.unwrap_or(0));
this.inner =
Inner::Sending(sender.send(SentMessage::ToAllActors(Arc::new(envelope))));
this.inner = Inner::Sending(SendFuture::New {
chan,
msg: SentMessage::ToAllActors(Arc::new(envelope)),
});
this.poll_unpin(cx)
}
Inner::Sending(mut send_fut) => match send_fut.poll_unpin(cx) {
Expand All @@ -107,9 +106,8 @@ where
}
}

impl<A, M, Rc> FusedFuture for BroadcastFuture<A, M, Rc>
impl<A, M> FusedFuture for BroadcastFuture<A, M>
where
Rc: TxRefCounter,
Self: Future,
{
fn is_terminated(&self) -> bool {
Expand Down
36 changes: 23 additions & 13 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::{mem, task};

Expand All @@ -12,7 +13,8 @@ use futures_util::FutureExt;

use crate::envelope::{HandlerSpan, Shutdown};
use crate::inbox::rx::{ReceiveFuture as InboxReceiveFuture, RxStrong};
use crate::inbox::ActorMessage;
use crate::inbox::tx::{TxStrong, TxWeak};
use crate::inbox::{ActorMessage, Chan};
use crate::{inbox, Actor, Address, Error, WeakAddress};

/// `Context` is used to control how the actor is managed and to get the actor's address from inside
Expand Down Expand Up @@ -61,14 +63,18 @@ impl<A: Actor> Context<A> {
///
/// ```
pub fn new(message_cap: Option<usize>) -> (Address<A>, Self) {
let (tx, rx) = inbox::new(message_cap);
let chan = Arc::new(Chan::new(message_cap));

let context = Context {
running: true,
mailbox: rx,
mailbox: inbox::Receiver::new(chan.clone()),
};
let address = Address {
rc: TxStrong::first(&chan),
inner: chan,
};

(Address(tx), context)
(address, context)
}

/// Stop this actor as soon as it has finished processing current message. This means that the
Expand All @@ -81,23 +87,27 @@ impl<A: Actor> Context<A> {
///
/// This is equivalent to calling [`Context::stop_self`] on all actors active on this address.
pub fn stop_all(&mut self) {
// We only need to shut down if there are still any strong senders left
if let Some(sender) = self.mailbox.sender() {
sender.stop_all_receivers();
}
self.mailbox.inner.shutdown_all_receivers()
}

/// Get an address to the current actor if there are still external addresses to the actor.
pub fn address(&self) -> Result<Address<A>, Error> {
self.mailbox
.sender()
.ok_or(Error::Disconnected)
.map(Address)
let chan = self.mailbox.inner.clone();

Ok(Address {
rc: TxStrong::try_new(&chan).ok_or(Error::Disconnected)?,
inner: chan,
})
}

/// Get a weak address to the current actor.
pub fn weak_address(&self) -> WeakAddress<A> {
Address(self.mailbox.weak_sender())
let chan = self.mailbox.inner.clone();

Address {
rc: TxWeak::new(&chan),
inner: chan,
}
}

/// Run the given actor's main loop, handling incoming messages to its mailbox.
Expand Down
Loading