Skip to content

Commit

Permalink
feat(exex): subscribe to notifications explicitly (paradigmxyz#10573)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Sep 6, 2024
1 parent 3ec5d37 commit a89de21
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion book/developers/exex/hello-world.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ reth = { git = "https://github.com/paradigmxyz/reth.git" } # Reth
reth-exex = { git = "https://github.com/paradigmxyz/reth.git" } # Execution Extensions
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git" } # Ethereum Node implementation
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } # Logging

eyre = "0.6" # Easy error handling
futures-util = "0.3" # Stream utilities for consuming notifications
```

### Default Reth node
Expand Down Expand Up @@ -101,13 +103,14 @@ If you try running a node with an ExEx that exits, the node will exit as well.
Now, let's extend our simplest ExEx and start actually listening to new notifications, log them, and send events back to the main node

```rust,norun,noplayground,ignore
use futures_util::StreamExt;
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
async fn my_exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
while let Some(notification) = ctx.notifications.next().await {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
10 changes: 7 additions & 3 deletions book/developers/exex/remote.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,15 @@ Don't forget to emit `ExExEvent::FinishedHeight`

```rust,norun,noplayground,ignore
// ...
use futures_util::StreamExt;
use reth_exex::{ExExContext, ExExEvent};
async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
Expand Down Expand Up @@ -332,6 +334,9 @@ fn main() -> eyre::Result<()> {
<summary>Click to expand</summary>

```rust,norun,noplayground,ignore
use std::sync::Arc;
use futures_util::StreamExt;
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
Expand All @@ -340,7 +345,6 @@ use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
Expand Down Expand Up @@ -381,7 +385,7 @@ async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
Expand Down
6 changes: 4 additions & 2 deletions book/developers/exex/tracking-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
task::{ready, Context, Poll},
};
use futures_util::StreamExt;
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
Expand All @@ -40,7 +41,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down Expand Up @@ -101,6 +102,7 @@ use std::{
task::{ready, Context, Poll},
};
use futures_util::StreamExt;
use reth::{api::FullNodeComponents, primitives::BlockNumber};
use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode;
Expand Down Expand Up @@ -130,7 +132,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub(
reverted_chain
Expand Down
6 changes: 3 additions & 3 deletions book/installation/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ You can build Reth on Linux, macOS, Windows, and Windows WSL2.
## Dependencies

First, **install Rust** using [rustup](https://rustup.rs/)
First, **install Rust** using [rustup](https://rustup.rs/)

```bash
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Expand All @@ -32,7 +32,7 @@ operating system:

These are needed to build bindings for Reth's database.

The Minimum Supported Rust Version (MSRV) of this project is 1.81.0. If you already have a version of Rust installed,
The Minimum Supported Rust Version (MSRV) of this project is 1.80.0. If you already have a version of Rust installed,
you can check your version by running `rustc --version`. To update your version of Rust, run `rustup update`.

## Build Reth
Expand Down Expand Up @@ -147,7 +147,7 @@ _(Thanks to Sigma Prime for this section from [their Lighthouse book](https://li

### Bus error (WSL2)

In WSL 2 on Windows, the default virtual disk size is set to 1TB.
In WSL 2 on Windows, the default virtual disk size is set to 1TB.

You must increase the allocated disk size for your WSL2 instance before syncing reth.

Expand Down
4 changes: 2 additions & 2 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ reth-metrics.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-payload-builder.workspace = true
reth-primitives-traits.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] }
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-prune-types.workspace = true
reth-revm.workspace = true
Expand All @@ -45,9 +45,9 @@ reth-db-api.workspace = true
reth-db-common.workspace = true
reth-evm-ethereum.workspace = true
reth-node-api.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }

secp256k1.workspace = true

Expand Down
12 changes: 6 additions & 6 deletions crates/exex/exex/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use reth_node_api::{FullNodeComponents, NodeTypesWithEngine};
use reth_node_core::node_config::NodeConfig;
use reth_primitives::Head;
use reth_tasks::TaskExecutor;
use tokio::sync::mpsc::{Receiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;

use crate::{ExExEvent, ExExNotification};
use crate::{ExExEvent, ExExNotifications};

/// Captures the context that an `ExEx` has access to.
pub struct ExExContext<Node: FullNodeComponents> {
Expand All @@ -24,13 +24,13 @@ pub struct ExExContext<Node: FullNodeComponents> {
/// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what
/// blocks to receive notifications for.
pub events: UnboundedSender<ExExEvent>,
/// Channel to receive [`ExExNotification`]s.
/// Channel to receive [`ExExNotification`](crate::ExExNotification)s.
///
/// # Important
///
/// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the
/// node.
pub notifications: Receiver<ExExNotification>,
/// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is
/// considered delivered by the node.
pub notifications: ExExNotifications<Node>,

/// node components
pub components: Node,
Expand Down
Loading

0 comments on commit a89de21

Please sign in to comment.