Skip to content

Commit

Permalink
Tidy, improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Aug 13, 2020
1 parent 5dbcd29 commit 84e16b9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
50 changes: 27 additions & 23 deletions beacon_node/network/src/router/gossip_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
//!
//! ## Detail
//!
//! There is a single "manager" thread who listens to a channel of events. These events are either:
//! There is a single "manager" thread who listens to two event channels. These events are either:
//!
//! - A new parcel of work.
//! - Indication that a worker has finished a parcel of work.
//! - A new parcel of work (work event).
//! - Indication that a worker has finished a parcel of work (worker idle).
//!
//! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count.
//!
Expand Down Expand Up @@ -115,13 +115,13 @@ impl<T> LifoQueue<T> {

/// An event to be processed by the manager task.
#[derive(Debug, PartialEq)]
pub struct Event<E: EthSpec> {
pub struct WorkEvent<E: EthSpec> {
message_id: MessageId,
peer_id: PeerId,
work: Work<E>,
}

impl<E: EthSpec> Event<E> {
impl<E: EthSpec> WorkEvent<E> {
/// Create a new `Work` event for some unaggregated attestation.
pub fn unaggregated_attestation(
message_id: MessageId,
Expand Down Expand Up @@ -174,15 +174,15 @@ pub struct GossipProcessor<T: BeaconChainTypes> {

impl<T: BeaconChainTypes> GossipProcessor<T> {
/// Spawns the "manager" task which checks the receiver end of the returned `Sender` for
/// messages which either:
/// messages which contain some new work which will be:
///
/// - Indicate that a worker has completed a task.
/// - Indicate that there is new work to be done.
/// - Performed immediately, if a worker is available.
/// - Queued for later processing, if no worker is currently available.
///
/// The caller should use the returned `Sender` **only** to send work to manager. It is a logic
/// error for an upstream caller to send a `WorkerIdle` message.
pub fn spawn_manager(mut self) -> mpsc::Sender<Event<T::EthSpec>> {
let (event_tx, mut event_rx) = mpsc::channel::<Event<T::EthSpec>>(MAX_WORK_QUEUE_LEN);
/// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
/// started with `spawn_blocking`.
pub fn spawn_manager(mut self) -> mpsc::Sender<WorkEvent<T::EthSpec>> {
let (event_tx, mut event_rx) = mpsc::channel::<WorkEvent<T::EthSpec>>(MAX_WORK_QUEUE_LEN);
let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_WORK_QUEUE_LEN);
let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN);
let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN);
Expand All @@ -194,15 +194,15 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
// Listen to both the event and idle channels, acting on whichever is ready
// first.
//
// Set `event = Some(event)` if there is new work to be done. Otherwise sets
// Set `work_event = Some(event)` if there is new work to be done. Otherwise sets
// `event = None` if it was a worker becoming idle.
let event;
let work_event;
tokio::select! {
// A worker has finished some work.
idle_msg = idle_rx.recv() => {
if idle_msg.is_some() {
new_idle_opt = idle_rx.recv() => {
if new_idle_opt.is_some() {
self.current_workers = self.current_workers.saturating_sub(1);
event = None
work_event = None
} else {
// Exit if all idle senders have been dropped.
//
Expand All @@ -216,9 +216,9 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
}
},
// There is a new piece of work to be handled.
event_msg = event_rx.recv() => {
if let Some(event_msg) = event_msg {
event = Some(event_msg)
new_work_event_opt = event_rx.recv() => {
if let Some(new_work_event) = new_work_event_opt {
work_event = Some(new_work_event)
} else {
// Exit if all event senders have been dropped.
//
Expand All @@ -241,7 +241,8 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
let initial_aggregate_queue_len = aggregate_queue.len();
let initial_attestation_queue_len = attestation_queue.len();

match event {
match work_event {
// There is no new work event, but we are able to spawn a new worker.
None if can_spawn => {
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively
Expand All @@ -262,14 +263,17 @@ impl<T: BeaconChainTypes> GossipProcessor<T> {
);
}
}
// There is no new work event and we are unable to spawn a new worker.
//
// I cannot see any good reason why this would happen.
None => {
warn!(
self.log,
"Unexpected gossip processor condition";
"msg" => "unable to spawn after worker becomes idle"
"msg" => "no new work and cannot spawn worker"
);
}
Some(Event {
Some(WorkEvent {
message_id,
peer_id,
work,
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/network/src/router/processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::gossip_processor::{Event as GossipProcessorEvent, GossipProcessor};
use super::gossip_processor::{GossipProcessor, WorkEvent as GossipWorkEvent};
use crate::service::NetworkMessage;
use crate::sync::{PeerSyncInfo, SyncMessage};
use beacon_chain::{
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct Processor<T: BeaconChainTypes> {
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for signed, consensus gossip messages.
gossip_processor_send: mpsc::Sender<GossipProcessorEvent<T::EthSpec>>,
gossip_processor_send: mpsc::Sender<GossipWorkEvent<T::EthSpec>>,
/// The `RPCHandler` logger.
log: slog::Logger,
}
Expand Down Expand Up @@ -605,7 +605,7 @@ impl<T: BeaconChainTypes> Processor<T> {
should_process: bool,
) {
self.gossip_processor_send
.try_send(GossipProcessorEvent::unaggregated_attestation(
.try_send(GossipWorkEvent::unaggregated_attestation(
message_id,
peer_id,
unaggregated_attestation,
Expand All @@ -629,7 +629,7 @@ impl<T: BeaconChainTypes> Processor<T> {
aggregate: SignedAggregateAndProof<T::EthSpec>,
) {
self.gossip_processor_send
.try_send(GossipProcessorEvent::aggregated_attestation(
.try_send(GossipWorkEvent::aggregated_attestation(
message_id, peer_id, aggregate,
))
.unwrap_or_else(|e| {
Expand Down

0 comments on commit 84e16b9

Please sign in to comment.