Skip to content

Commit

Permalink
Separate idle/event messages
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Aug 13, 2020
1 parent a6091dd commit 5dbcd29
Showing 1 changed file with 66 additions and 26 deletions.
92 changes: 66 additions & 26 deletions beacon_node/network/src/router/gossip_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,10 @@ impl<T> LifoQueue<T> {

/// An event to be processed by the manager task.
#[derive(Debug, PartialEq)]
pub enum Event<E: EthSpec> {
/// A worker has finished its work and is idle/shutdown.
WorkerIdle,
/// There is some work to be done.
Work {
message_id: MessageId,
peer_id: PeerId,
work: Work<E>,
},
pub struct Event<E: EthSpec> {
message_id: MessageId,
peer_id: PeerId,
work: Work<E>,
}

impl<E: EthSpec> Event<E> {
Expand All @@ -135,7 +130,7 @@ impl<E: EthSpec> Event<E> {
subnet_id: SubnetId,
should_import: bool,
) -> Self {
Event::Work {
Self {
message_id,
peer_id,
work: Work::Attestation(Box::new((attestation, subnet_id, should_import))),
Expand All @@ -148,7 +143,7 @@ impl<E: EthSpec> Event<E> {
peer_id: PeerId,
aggregate: SignedAggregateAndProof<E>,
) -> Self {
Event::Work {
Self {
message_id,
peer_id,
work: Work::Aggregate(Box::new(aggregate)),
Expand Down Expand Up @@ -188,62 +183,107 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
/// error for an upstream caller to send a `WorkerIdle` message.
pub fn spawn_manager(mut self) -> mpsc::Sender<Event<T::EthSpec>> {
let (event_tx, mut event_rx) = mpsc::channel::<Event<T::EthSpec>>(MAX_WORK_QUEUE_LEN);
let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_WORK_QUEUE_LEN);
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
let inner_event_tx = event_tx.clone();
let executor = self.executor.clone();

executor.spawn(
async move {
while let Some(event) = event_rx.recv().await {
loop {
// Listen to both the event and idle channels, acting on whichever is ready
// first.
//
// Set `event = Some(event)` if there is new work to be done. Otherwise sets
// `event = None` if it was a worker becoming idle.
let event;
tokio::select! {
// A worker has finished some work.
idle_msg = idle_rx.recv() => {
if idle_msg.is_some() {
self.current_workers = self.current_workers.saturating_sub(1);
event = None
} else {
// Exit if all idle senders have been dropped.
//
// This shouldn't happen since this function holds a sender.
crit!(
self.log,
"Gossip processor stopped";
"msg" => "all idle senders dropped"
);
break
}
},
// There is a new piece of work to be handled.
event_msg = event_rx.recv() => {
if let Some(event_msg) = event_msg {
event = Some(event_msg)
} else {
// Exit if all event senders have been dropped.
//
// This should happen when the client shuts down.
debug!(
self.log,
"Gossip processor stopped";
"msg" => "all event senders dropped"
);
break
}
}
};

let _event_timer =
metrics::start_timer(&metrics::GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS);
metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_EVENTS_TOTAL);

if event == Event::WorkerIdle {
self.current_workers = self.current_workers.saturating_sub(1);
}

let can_spawn = self.current_workers < self.max_workers;
let initial_aggregate_queue_len = aggregate_queue.len();
let initial_attestation_queue_len = attestation_queue.len();

match event {
Event::WorkerIdle => {
None if can_spawn => {
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively
// give us more information with less signature verification time.
if let Some(item) = aggregate_queue.pop() {
self.spawn_worker(
inner_event_tx.clone(),
idle_tx.clone(),
item.message_id,
item.peer_id,
Work::Aggregate(item.item),
);
} else if let Some(item) = attestation_queue.pop() {
self.spawn_worker(
inner_event_tx.clone(),
idle_tx.clone(),
item.message_id,
item.peer_id,
Work::Attestation(item.item),
);
}
}
Event::Work {
None => {
warn!(
self.log,
"Unexpected gossip processor condition";
"msg" => "unable to spawn after worker becomes idle"
);
}
Some(Event {
message_id,
peer_id,
work,
} => match work {
}) => match work {
Work::Attestation(_) if can_spawn => {
self.spawn_worker(inner_event_tx.clone(), message_id, peer_id, work)
self.spawn_worker(idle_tx.clone(), message_id, peer_id, work)
}
Work::Attestation(attestation) => attestation_queue.push(QueueItem {
message_id,
peer_id,
item: attestation,
}),
Work::Aggregate(_) if can_spawn => {
self.spawn_worker(inner_event_tx.clone(), message_id, peer_id, work)
self.spawn_worker(idle_tx.clone(), message_id, peer_id, work)
}
Work::Aggregate(aggregate) => aggregate_queue.push(QueueItem {
message_id,
Expand Down Expand Up @@ -300,7 +340,7 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
/// Sends an `Event::WorkerIdle` message on `event_tx` when the work is complete.
fn spawn_worker(
&mut self,
mut event_tx: mpsc::Sender<Event<T::EthSpec>>,
mut idle_tx: mpsc::Sender<()>,
message_id: MessageId,
peer_id: PeerId,
work: Work<T::EthSpec>,
Expand Down Expand Up @@ -462,7 +502,7 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
};
handler();

event_tx.try_send(Event::WorkerIdle).unwrap_or_else(|e| {
idle_tx.try_send(()).unwrap_or_else(|e| {
crit!(
log,
"Unable to free worker";
Expand Down

0 comments on commit 5dbcd29

Please sign in to comment.