Skip to content

Commit

Permalink
perf: mine new blocks on blocking task (#7328)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Mar 7, 2024
1 parent b6e7c8b commit f787fed
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions crates/anvil/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::time::Interval;
use tokio::{task::JoinHandle, time::Interval};

/// The type that drives the blockchain's state
///
Expand Down Expand Up @@ -101,17 +101,13 @@ impl Future for NodeService {
}
}

// The type of the future that mines a new block
type BlockMiningFuture =
Pin<Box<dyn Future<Output = (MinedBlockOutcome, Arc<Backend>)> + Send + Sync>>;

/// A type that exclusively mines one block at a time
#[must_use = "streams do nothing unless polled"]
struct BlockProducer {
/// Holds the backend if no block is being mined
idle_backend: Option<Arc<Backend>>,
/// Single active future that mines a new block
block_mining: Option<BlockMiningFuture>,
block_mining: Option<JoinHandle<(MinedBlockOutcome, Arc<Backend>)>>,
/// backlog of sets of transactions ready to be mined
queued: VecDeque<Vec<Arc<PoolTransaction>>>,
}
Expand All @@ -133,19 +129,33 @@ impl Stream for BlockProducer {
if !pin.queued.is_empty() {
if let Some(backend) = pin.idle_backend.take() {
let transactions = pin.queued.pop_front().expect("not empty; qed");
pin.block_mining = Some(Box::pin(async move {
trace!(target: "miner", "creating new block");
let block = backend.mine_block(transactions).await;
trace!(target: "miner", "created new block: {}", block.block_number);
(block, backend)
}));

// we spawn this on as blocking task because in this can be blocking for a while in
// forking mode, because of all the rpc calls to fetch the required state
let handle = tokio::runtime::Handle::current();
let mining = tokio::task::spawn_blocking(move || {
handle.block_on(async move {
trace!(target: "miner", "creating new block");
let block = backend.mine_block(transactions).await;
trace!(target: "miner", "created new block: {}", block.block_number);
(block, backend)
})
});
pin.block_mining = Some(mining);
}
}

if let Some(mut mining) = pin.block_mining.take() {
if let Poll::Ready((outcome, backend)) = mining.poll_unpin(cx) {
pin.idle_backend = Some(backend);
return Poll::Ready(Some(outcome))
if let Poll::Ready(res) = mining.poll_unpin(cx) {
return match res {
Ok((outcome, backend)) => {
pin.idle_backend = Some(backend);
Poll::Ready(Some(outcome))
}
Err(err) => {
panic!("miner task failed: {}", err);
}
}
} else {
pin.block_mining = Some(mining)
}
Expand Down

0 comments on commit f787fed

Please sign in to comment.