From 5dbcd29ad14ff0592f08aab94ad4208fd30f9697 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 13 Aug 2020 10:54:23 +1000 Subject: [PATCH] Separate idle/event messages --- .../network/src/router/gossip_processor.rs | 92 +++++++++++++------ 1 file changed, 66 insertions(+), 26 deletions(-) diff --git a/beacon_node/network/src/router/gossip_processor.rs b/beacon_node/network/src/router/gossip_processor.rs index 6cfc5693c4a..24e41f81883 100644 --- a/beacon_node/network/src/router/gossip_processor.rs +++ b/beacon_node/network/src/router/gossip_processor.rs @@ -115,15 +115,10 @@ impl LifoQueue { /// An event to be processed by the manager task. #[derive(Debug, PartialEq)] -pub enum Event { - /// A worker has finished its work and is idle/shutdown. - WorkerIdle, - /// There is some work to be done. - Work { - message_id: MessageId, - peer_id: PeerId, - work: Work, - }, +pub struct Event { + message_id: MessageId, + peer_id: PeerId, + work: Work, } impl Event { @@ -135,7 +130,7 @@ impl Event { subnet_id: SubnetId, should_import: bool, ) -> Self { - Event::Work { + Self { message_id, peer_id, work: Work::Attestation(Box::new((attestation, subnet_id, should_import))), @@ -148,7 +143,7 @@ impl Event { peer_id: PeerId, aggregate: SignedAggregateAndProof, ) -> Self { - Event::Work { + Self { message_id, peer_id, work: Work::Aggregate(Box::new(aggregate)), @@ -188,54 +183,99 @@ impl GossipProcessor { /// error for an upstream caller to send a `WorkerIdle` message. pub fn spawn_manager(mut self) -> mpsc::Sender> { let (event_tx, mut event_rx) = mpsc::channel::>(MAX_WORK_QUEUE_LEN); + let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_WORK_QUEUE_LEN); let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN); let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); - let inner_event_tx = event_tx.clone(); let executor = self.executor.clone(); executor.spawn( async move { - while let Some(event) = event_rx.recv().await { + loop { + // Listen to both the event and idle channels, acting on whichever is ready + // first. + // + // Set `event = Some(event)` if there is new work to be done. Otherwise sets + // `event = None` if it was a worker becoming idle. + let event; + tokio::select! { + // A worker has finished some work. + idle_msg = idle_rx.recv() => { + if idle_msg.is_some() { + self.current_workers = self.current_workers.saturating_sub(1); + event = None + } else { + // Exit if all idle senders have been dropped. + // + // This shouldn't happen since this function holds a sender. + crit!( + self.log, + "Gossip processor stopped"; + "msg" => "all idle senders dropped" + ); + break + } + }, + // There is a new piece of work to be handled. + event_msg = event_rx.recv() => { + if let Some(event_msg) = event_msg { + event = Some(event_msg) + } else { + // Exit if all event senders have been dropped. + // + // This should happen when the client shuts down. + debug!( + self.log, + "Gossip processor stopped"; + "msg" => "all event senders dropped" + ); + break + } + } + }; + let _event_timer = metrics::start_timer(&metrics::GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS); metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_EVENTS_TOTAL); - if event == Event::WorkerIdle { - self.current_workers = self.current_workers.saturating_sub(1); - } - let can_spawn = self.current_workers < self.max_workers; let initial_aggregate_queue_len = aggregate_queue.len(); let initial_attestation_queue_len = attestation_queue.len(); match event { - Event::WorkerIdle => { + None if can_spawn => { // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively // give us more information with less signature verification time. if let Some(item) = aggregate_queue.pop() { self.spawn_worker( - inner_event_tx.clone(), + idle_tx.clone(), item.message_id, item.peer_id, Work::Aggregate(item.item), ); } else if let Some(item) = attestation_queue.pop() { self.spawn_worker( - inner_event_tx.clone(), + idle_tx.clone(), item.message_id, item.peer_id, Work::Attestation(item.item), ); } } - Event::Work { + None => { + warn!( + self.log, + "Unexpected gossip processor condition"; + "msg" => "unable to spawn after worker becomes idle" + ); + } + Some(Event { message_id, peer_id, work, - } => match work { + }) => match work { Work::Attestation(_) if can_spawn => { - self.spawn_worker(inner_event_tx.clone(), message_id, peer_id, work) + self.spawn_worker(idle_tx.clone(), message_id, peer_id, work) } Work::Attestation(attestation) => attestation_queue.push(QueueItem { message_id, @@ -243,7 +283,7 @@ impl GossipProcessor { item: attestation, }), Work::Aggregate(_) if can_spawn => { - self.spawn_worker(inner_event_tx.clone(), message_id, peer_id, work) + self.spawn_worker(idle_tx.clone(), message_id, peer_id, work) } Work::Aggregate(aggregate) => aggregate_queue.push(QueueItem { message_id, @@ -300,7 +340,7 @@ impl GossipProcessor { /// Sends an `Event::WorkerIdle` message on `event_tx` when the work is complete. fn spawn_worker( &mut self, - mut event_tx: mpsc::Sender>, + mut idle_tx: mpsc::Sender<()>, message_id: MessageId, peer_id: PeerId, work: Work, @@ -462,7 +502,7 @@ impl GossipProcessor { }; handler(); - event_tx.try_send(Event::WorkerIdle).unwrap_or_else(|e| { + idle_tx.try_send(()).unwrap_or_else(|e| { crit!( log, "Unable to free worker";