Skip to content

Commit

Permalink
feat: test channel for pausing persistence receiver (#10699)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Sep 6, 2024
1 parent ed778e1 commit 27d4e8c
Showing 1 changed file with 69 additions and 0 deletions.
69 changes: 69 additions & 0 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,61 @@ mod tests {
};
use tokio::sync::mpsc::unbounded_channel;

/// This is a test channel that allows you to `release` any value that is in the channel.
///
/// If nothing has been sent, then the next value will be immediately sent.
#[allow(dead_code)]
struct TestChannel<T> {
/// If an item is sent to this channel, an item will be released in the wrapped channel
release: Receiver<()>,
/// The sender channel
tx: Sender<T>,
/// The receiver channel
rx: Receiver<T>,
}

impl<T: Send + 'static> TestChannel<T> {
/// Creates a new test channel
#[allow(dead_code)]
fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
let (original_tx, original_rx) = channel();
let (wrapped_tx, wrapped_rx) = channel();
let (release_tx, release_rx) = channel();
let handle = TestChannelHandle::new(release_tx);
let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
// spawn the task that listens and releases stuff
std::thread::spawn(move || test_channel.intercept_loop());
(original_tx, wrapped_rx, handle)
}

/// Runs the intercept loop, waiting for the handle to release a value
fn intercept_loop(&self) {
while self.release.recv() == Ok(()) {
let Ok(value) = self.rx.recv() else { return };

let _ = self.tx.send(value);
}
}
}

struct TestChannelHandle {
/// The sender to use for releasing values
release: Sender<()>,
}

impl TestChannelHandle {
/// Returns a [`TestChannelHandle`]
const fn new(release: Sender<()>) -> Self {
Self { release }
}

/// Signals to the channel task that a value should be released
#[allow(dead_code)]
fn release(&self) {
let _ = self.release.send(());
}
}

struct TestHarness {
tree: EngineApiTreeHandler<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes>>>,
Expand All @@ -2543,6 +2598,20 @@ mod tests {
impl TestHarness {
fn new(chain_spec: Arc<ChainSpec>) -> Self {
let (action_tx, action_rx) = channel();
Self::with_persistence_channel(chain_spec, action_tx, action_rx)
}

#[allow(dead_code)]
fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
(Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
}

fn with_persistence_channel(
chain_spec: Arc<ChainSpec>,
action_tx: Sender<PersistenceAction>,
action_rx: Receiver<PersistenceAction>,
) -> Self {
let persistence_handle = PersistenceHandle::new(action_tx);

let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
Expand Down

0 comments on commit 27d4e8c

Please sign in to comment.