From 65a7de49b256428e94df7a9f7be87687e1d25d49 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 14:08:02 +0100 Subject: [PATCH 1/4] Address review on stable-futures --- core/src/nodes/tasks/manager.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index b17b1cf394f..505e0ef81b3 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -67,7 +67,7 @@ pub struct Manager { /// `local_spawns` list instead. threads_pool: Option, - /// If no executor is available, we move tasks to this list, and futures are polled on the + /// If no executor is available, we move tasks to this set, and futures are polled on the /// current thread instead. local_spawns: FuturesUnordered + Send>>>, @@ -237,7 +237,12 @@ impl Manager { let msg = ToTaskMessage::HandlerEvent(event.clone()); match task.sender.start_send(msg) { Ok(()) => {}, - Err(ref err) if err.is_full() => {}, // TODO: somehow report to user? + Err(ref err) if err.is_full() => { + // Note that the user is expected to call `poll_ready_broadcast` beforehand, + // which returns `Poll::Ready` only if the channel isn't full. Reaching this + // path always indicates a mistake in the code. + log::warn!("start_broadcast called while channel was full"); + }, Err(_) => {}, } } From be0f18d162b6aa0e1d7c50d5bca20e59af5c4ff1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 14:50:33 +0100 Subject: [PATCH 2/4] Update core/src/nodes/tasks/manager.rs Co-Authored-By: Max Inden --- core/src/nodes/tasks/manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index 505e0ef81b3..76d88fe1e69 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -241,7 +241,7 @@ impl Manager { // Note that the user is expected to call `poll_ready_broadcast` beforehand, // which returns `Poll::Ready` only if the channel isn't full. Reaching this // path always indicates a mistake in the code. - log::warn!("start_broadcast called while channel was full"); + log::warn!("start_broadcast called while channel was full. Have you called `poll_ready_broadcast` before?"); }, Err(_) => {}, } @@ -475,4 +475,3 @@ impl fmt::Debug for ClosedTask { .finish() } } - From 5e0f219555443a342dfcf7ac076228aaa5980792 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 17:20:01 +0100 Subject: [PATCH 3/4] Use a different fix --- core/src/nodes/collection.rs | 16 ++++++--------- core/src/nodes/network.rs | 16 ++++++--------- core/src/nodes/tasks/manager.rs | 35 +++++++++++---------------------- core/src/nodes/tasks/mod.rs | 2 +- 4 files changed, 25 insertions(+), 44 deletions(-) diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 9e21281029e..93a9580d41f 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -356,20 +356,16 @@ where } } - /// Sends an event to all nodes. + /// Sends a message to all nodes. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - pub fn start_broadcast(&mut self, event: &TInEvent) + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. + #[must_use] + pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> where TInEvent: Clone { - self.inner.start_broadcast(event) - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - self.inner.poll_ready_broadcast(cx) + self.inner.poll_broadcast(event, cx) } /// Adds an existing connection to a node to the collection. diff --git a/core/src/nodes/network.rs b/core/src/nodes/network.rs index 2f6634a1be7..3e3dd691000 100644 --- a/core/src/nodes/network.rs +++ b/core/src/nodes/network.rs @@ -845,20 +845,16 @@ where }) } - /// Start sending an event to all nodes. + /// Sends a message to all the tasks, including the pending ones. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - pub fn start_broadcast(&mut self, event: &TInEvent) + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. + #[must_use] + pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> where TInEvent: Clone { - self.active_nodes.start_broadcast(event) - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - self.active_nodes.poll_ready_broadcast(cx) + self.active_nodes.poll_broadcast(event, cx) } /// Returns a list of all the peers we are currently connected to. diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index 76d88fe1e69..dbfe485a2b7 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -221,39 +221,28 @@ impl Manager { task_id } - /// Start sending an event to all the tasks, including the pending ones. + /// Sends a message to all the tasks, including the pending ones. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - /// - /// After starting a broadcast make sure to finish it with `complete_broadcast`, - /// otherwise starting another broadcast or sending an event directly to a - /// task would overwrite the pending broadcast. + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. #[must_use] - pub fn start_broadcast(&mut self, event: &I) + pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()> where I: Clone { for task in self.tasks.values_mut() { - let msg = ToTaskMessage::HandlerEvent(event.clone()); - match task.sender.start_send(msg) { - Ok(()) => {}, - Err(ref err) if err.is_full() => { - // Note that the user is expected to call `poll_ready_broadcast` beforehand, - // which returns `Poll::Ready` only if the channel isn't full. Reaching this - // path always indicates a mistake in the code. - log::warn!("start_broadcast called while channel was full. Have you called `poll_ready_broadcast` before?"); - }, - Err(_) => {}, + if let Poll::Pending = task.sender.poll_ready(cx) { + return Poll::Pending; } } - } - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { for task in self.tasks.values_mut() { - if let Poll::Pending = task.sender.poll_ready(cx) { - return Poll::Pending; + let msg = ToTaskMessage::HandlerEvent(event.clone()); + match task.sender.start_send(msg) { + Ok(()) => {}, + Err(ref err) if err.is_full() => + panic!("poll_ready returned Poll::Ready just above; qed"), + Err(_) => {}, } } diff --git a/core/src/nodes/tasks/mod.rs b/core/src/nodes/tasks/mod.rs index 2af4939c269..5275121fd45 100644 --- a/core/src/nodes/tasks/mod.rs +++ b/core/src/nodes/tasks/mod.rs @@ -29,7 +29,7 @@ //! an existing connection to a node should be driven forward (cf. //! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`] //! and messages can be sent to individual tasks or all (cf. -//! [`Manager::start_broadcast`]). Messages produces by tasks can be +//! [`Manager::poll_broadcast`]). Messages produces by tasks can be //! retrieved by polling the manager (cf. [`Manager::poll`]). mod error; From 55d69250063636801808b5561fde28783771969d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 17:22:09 +0100 Subject: [PATCH 4/4] Terminology --- core/src/nodes/collection.rs | 2 +- core/src/nodes/network.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 93a9580d41f..596ad3b1b30 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -356,7 +356,7 @@ where } } - /// Sends a message to all nodes. + /// Sends an event to all nodes. /// /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event /// has been sent to any node yet. diff --git a/core/src/nodes/network.rs b/core/src/nodes/network.rs index 3e3dd691000..43ef11ade7a 100644 --- a/core/src/nodes/network.rs +++ b/core/src/nodes/network.rs @@ -845,7 +845,7 @@ where }) } - /// Sends a message to all the tasks, including the pending ones. + /// Sends an event to all nodes. /// /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event /// has been sent to any node yet.