Skip to content

Commit

Permalink
feat: bounded consensus events channel
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed May 14, 2024
1 parent 06ff479 commit 23581c8
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 39 deletions.
6 changes: 3 additions & 3 deletions crates/consensus/auto-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::{mpsc::Sender, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::trace;

mod client;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct AutoSealBuilder<Client, Pool, Engine: EngineTypes, EvmConfig> {
pool: Pool,
mode: MiningMode,
storage: Storage,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
evm_config: EvmConfig,
}
Expand All @@ -115,7 +115,7 @@ where
chain_spec: Arc<ChainSpec>,
client: Client,
pool: Pool,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
mode: MiningMode,
evm_config: EvmConfig,
Expand Down
21 changes: 13 additions & 8 deletions crates/consensus/auto-seal/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::{mpsc::Sender, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{debug, error, warn};

Expand All @@ -38,7 +38,7 @@ pub struct MiningTask<Client, Pool: TransactionPool, Executor, Engine: EngineTyp
/// backlog of sets of transactions ready to be mined
queued: VecDeque<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>,
// TODO: ideally this would just be a sender of hashes
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
/// Used to notify consumers of new blocks
canon_state_notification: CanonStateNotificationSender,
/// The pipeline events to listen on
Expand All @@ -57,7 +57,7 @@ impl<Executor, Client, Pool: TransactionPool, Engine: EngineTypes>
pub(crate) fn new(
chain_spec: Arc<ChainSpec>,
miner: MiningMode,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
storage: Storage,
client: Client,
Expand Down Expand Up @@ -166,11 +166,16 @@ where
// send the new update to the engine, this will trigger the engine
// to download and execute the block we just inserted
let (tx, rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
if let Err(err) = to_engine
.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
})
.await
{
warn!("sending to consensus bounded channel: {err}");
}
debug!(target: "consensus::auto", ?state, "Sent fork choice update");

match rx.await.unwrap() {
Expand Down
36 changes: 25 additions & 11 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use reth_tokio_util::EventListeners;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::{mpsc::Sender, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use tracing::warn;

/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
/// engine task.
Expand All @@ -23,7 +24,7 @@ pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
pub(crate) to_engine: Sender<BeaconEngineMessage<Engine>>,
event_listeners: EventListeners<BeaconConsensusEngineEvent>,
}

Expand All @@ -43,7 +44,7 @@ where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
pub fn new(to_engine: Sender<BeaconEngineMessage<Engine>>) -> Self {
let event_listeners: EventListeners<BeaconConsensusEngineEvent> = Default::default();
let tx = event_listeners.clone_sender();
let _ = to_engine.send(BeaconEngineMessage::EventListener(tx));
Expand All @@ -60,7 +61,13 @@ where
cancun_fields: Option<CancunPayloadFields>,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx });
if let Err(err) = self
.to_engine
.send(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
.await
{
warn!("sending to consensus bounded channel: {err}");
}
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}

Expand All @@ -74,32 +81,39 @@ where
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.await
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}

/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
async fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
});
if let Err(err) = self
.to_engine
.send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
.await
{
warn!("sending to consensus bounded channel: {err}");
}
rx
}

/// Sends a transition configuration exchange message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
pub async fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
if let Err(err) =
self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged).await
{
warn!("sending to consensus bounded channel: {err}");
}
}

/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
Expand Down
17 changes: 10 additions & 7 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::{
broadcast::Sender,
mpsc::{self, UnboundedSender},
broadcast::Sender as BroadcastSender,
mpsc::{self, Sender as BoundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::wrappers::ReceiverStream;
use tracing::*;

mod message;
Expand Down Expand Up @@ -90,6 +90,9 @@ const MAX_INVALID_HEADERS: u32 = 512u32;
/// If the distance exceeds this threshold, the pipeline will be used for sync.
pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;

/// The size of the channel used to receive consensus events sent from the RPC.
pub const DEFAULT_CONSENSUS_CHANNEL_SIZE: usize = 1000;

/// The beacon consensus engine is the driver that switches between historical and live sync.
///
/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are
Expand Down Expand Up @@ -237,7 +240,7 @@ where
pipeline_run_threshold: u64,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let (to_engine, rx) = mpsc::unbounded_channel();
let (to_engine, rx) = mpsc::channel(DEFAULT_CONSENSUS_CHANNEL_SIZE);
Self::with_channel(
client,
pipeline,
Expand All @@ -250,7 +253,7 @@ where
target,
pipeline_run_threshold,
to_engine,
Box::pin(UnboundedReceiverStream::from(rx)),
Box::pin(ReceiverStream::from(rx)),
hooks,
)
}
Expand Down Expand Up @@ -279,7 +282,7 @@ where
payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>,
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
to_engine: BoundedSender<BeaconEngineMessage<EngineT>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
Expand Down Expand Up @@ -600,7 +603,7 @@ where

/// Sets a [Sender] to the engine's notifier. Also sets a [Sender] to
/// the sync controller's notifier.
pub(crate) fn set_sender(&mut self, sender: Sender<BeaconConsensusEngineEvent>) {
pub(crate) fn set_sender(&mut self, sender: BroadcastSender<BeaconConsensusEngineEvent>) {
self.listeners.set_sender(sender.clone());
self.sync.set_sender(sender);
}
Expand Down
13 changes: 8 additions & 5 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{future, future::Either, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::AutoSealConsensus;
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensusEngine, EthBeaconConsensus,
BeaconConsensusEngine, EthBeaconConsensus, DEFAULT_CONSENSUS_CHANNEL_SIZE,
};
use reth_blockchain_tree::{
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
Expand All @@ -35,8 +35,11 @@ use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::TransactionPool;
use std::{future::Future, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::sync::{
mpsc::{channel, unbounded_channel},
oneshot,
};
use tokio_stream::wrappers::ReceiverStream;

pub mod common;
pub use common::LaunchContext;
Expand Down Expand Up @@ -258,10 +261,10 @@ where

// create pipeline
let network_client = node_adapter.network().fetch_client().await?;
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
let (consensus_engine_tx, consensus_engine_rx) = channel(DEFAULT_CONSENSUS_CHANNEL_SIZE);

let node_config = ctx.node_config();
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
let consensus_engine_stream = ReceiverStream::from(consensus_engine_rx)
.maybe_skip_fcu(node_config.debug.skip_fcu)
.maybe_skip_new_payload(node_config.debug.skip_new_payload)
// Store messages _after_ skipping so that `replay-engine` command
Expand Down
4 changes: 2 additions & 2 deletions crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use reth_rpc_layer::JwtSecret;
use reth_tasks::TokioTaskExecutor;
use reth_transaction_pool::test_utils::{TestPool, TestPoolBuilder};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::channel;

/// Localhost with port 0 so a free port is used.
pub fn test_address() -> SocketAddr {
Expand All @@ -25,7 +25,7 @@ pub fn test_address() -> SocketAddr {
/// Launches a new server for the auth module
pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
let config = AuthServerConfig::builder(secret).socket_addr(test_address()).build();
let (tx, _rx) = unbounded_channel();
let (tx, _rx) = channel(1000);
let beacon_engine_handle = BeaconConsensusEngineHandle::<EthEngineTypes>::new(tx);
let engine_api = EngineApi::new(
NoopProvider::default(),
Expand Down
6 changes: 3 additions & 3 deletions crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,14 +778,14 @@ mod tests {
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block;
use reth_tasks::TokioTaskExecutor;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::mpsc::{channel, Receiver};

fn setup_engine_api() -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>, EthEngineTypes>)
{
let chain_spec: Arc<ChainSpec> = MAINNET.clone();
let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service();
let (to_engine, engine_rx) = unbounded_channel();
let (to_engine, engine_rx) = channel(1000);
let task_executor = Box::<TokioTaskExecutor>::default();
let api = EngineApi::new(
provider.clone(),
Expand All @@ -801,7 +801,7 @@ mod tests {
struct EngineApiTestHandle {
chain_spec: Arc<ChainSpec>,
provider: Arc<MockEthProvider>,
from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
from_api: Receiver<BeaconEngineMessage<EthEngineTypes>>,
}

#[tokio::test]
Expand Down

0 comments on commit 23581c8

Please sign in to comment.