diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 9e21281029e..596ad3b1b30 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -358,18 +358,14 @@ where /// Sends an event to all nodes. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - pub fn start_broadcast(&mut self, event: &TInEvent) + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. + #[must_use] + pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> where TInEvent: Clone { - self.inner.start_broadcast(event) - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - self.inner.poll_ready_broadcast(cx) + self.inner.poll_broadcast(event, cx) } /// Adds an existing connection to a node to the collection. diff --git a/core/src/nodes/network.rs b/core/src/nodes/network.rs index 2f6634a1be7..43ef11ade7a 100644 --- a/core/src/nodes/network.rs +++ b/core/src/nodes/network.rs @@ -845,20 +845,16 @@ where }) } - /// Start sending an event to all nodes. + /// Sends an event to all nodes. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - pub fn start_broadcast(&mut self, event: &TInEvent) + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. + #[must_use] + pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> where TInEvent: Clone { - self.active_nodes.start_broadcast(event) - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - self.active_nodes.poll_ready_broadcast(cx) + self.active_nodes.poll_broadcast(event, cx) } /// Returns a list of all the peers we are currently connected to. diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index b17b1cf394f..dbfe485a2b7 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -67,7 +67,7 @@ pub struct Manager { /// `local_spawns` list instead. threads_pool: Option, - /// If no executor is available, we move tasks to this list, and futures are polled on the + /// If no executor is available, we move tasks to this set, and futures are polled on the /// current thread instead. local_spawns: FuturesUnordered + Send>>>, @@ -221,34 +221,28 @@ impl Manager { task_id } - /// Start sending an event to all the tasks, including the pending ones. + /// Sends a message to all the tasks, including the pending ones. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - /// - /// After starting a broadcast make sure to finish it with `complete_broadcast`, - /// otherwise starting another broadcast or sending an event directly to a - /// task would overwrite the pending broadcast. + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. #[must_use] - pub fn start_broadcast(&mut self, event: &I) + pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()> where I: Clone { for task in self.tasks.values_mut() { - let msg = ToTaskMessage::HandlerEvent(event.clone()); - match task.sender.start_send(msg) { - Ok(()) => {}, - Err(ref err) if err.is_full() => {}, // TODO: somehow report to user? - Err(_) => {}, + if let Poll::Pending = task.sender.poll_ready(cx) { + return Poll::Pending; } } - } - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { for task in self.tasks.values_mut() { - if let Poll::Pending = task.sender.poll_ready(cx) { - return Poll::Pending; + let msg = ToTaskMessage::HandlerEvent(event.clone()); + match task.sender.start_send(msg) { + Ok(()) => {}, + Err(ref err) if err.is_full() => + panic!("poll_ready returned Poll::Ready just above; qed"), + Err(_) => {}, } } @@ -470,4 +464,3 @@ impl fmt::Debug for ClosedTask { .finish() } } - diff --git a/core/src/nodes/tasks/mod.rs b/core/src/nodes/tasks/mod.rs index 2af4939c269..5275121fd45 100644 --- a/core/src/nodes/tasks/mod.rs +++ b/core/src/nodes/tasks/mod.rs @@ -29,7 +29,7 @@ //! an existing connection to a node should be driven forward (cf. //! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`] //! and messages can be sent to individual tasks or all (cf. -//! [`Manager::start_broadcast`]). Messages produces by tasks can be +//! [`Manager::poll_broadcast`]). Messages produces by tasks can be //! retrieved by polling the manager (cf. [`Manager::poll`]). mod error;