Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mock ledger services in integration tests #1976

Merged
merged 7 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .changelog/unreleased/bug-fixes/1964-fix-protocol-txs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Fix broadcasting logic for protocol txs when a node operating the network is a
validator ([\#1964](https://github.com/anoma/namada/pull/1964))
2 changes: 2 additions & 0 deletions .changelog/unreleased/testing/1976-int-test-services.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Mock ledger services in integration tests
([\#1976](https://github.com/anoma/namada/pull/1976))
141 changes: 92 additions & 49 deletions apps/src/lib/node/ledger/ethereum_oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ops::ControlFlow;
use async_trait::async_trait;
use ethabi::Address;
use ethbridge_events::{event_codecs, EventKind};
use itertools::Either;
use namada::core::hints;
use namada::core::types::ethereum_structs;
use namada::eth_bridge::ethers;
Expand Down Expand Up @@ -75,13 +76,6 @@ pub trait RpcClient {
/// Ethereum event log.
type Log: IntoEthAbiLog;

/// Whether we should stop running the Ethereum oracle
/// if a call to [`Self::check_events_in_block`] fails.
///
/// This is only useful for testing purposes. In general,
/// no implementation should override this constant.
const EXIT_ON_EVENTS_FAILURE: bool = true;

/// Instantiate a new client, pointing to the
/// given RPC url.
fn new_client(rpc_url: &str) -> Self
Expand All @@ -108,6 +102,10 @@ pub trait RpcClient {
backoff: Duration,
deadline: Instant,
) -> Result<SyncStatus, Error>;

/// Given its current state, check if this RPC client
/// may recover from the given [`enum@Error`].
fn may_recover(&self, error: &Error) -> bool;
}

#[async_trait(?Send)]
Expand Down Expand Up @@ -172,6 +170,14 @@ impl RpcClient for Provider<Http> {
},
}
}

#[inline(always)]
fn may_recover(&self, error: &Error) -> bool {
!matches!(
error,
Error::Timeout | Error::Channel(_, _) | Error::CheckEvents(_, _, _)
)
}
}

/// A client that can talk to geth and parse
Expand All @@ -197,15 +203,18 @@ impl<C: RpcClient> Oracle<C> {
/// Construct a new [`Oracle`]. Note that it can not do anything until it
/// has been sent a configuration via the passed in `control` channel.
pub fn new(
url: &str,
client_or_url: Either<C, &str>,
sender: BoundedSender<EthereumEvent>,
last_processed_block: last_processed_block::Sender,
backoff: Duration,
ceiling: Duration,
control: control::Receiver,
) -> Self {
Self {
client: C::new_client(url),
client: match client_or_url {
Either::Left(client) => client,
Either::Right(url) => C::new_client(url),
},
sender,
backoff,
ceiling,
Expand Down Expand Up @@ -275,7 +284,7 @@ pub fn run_oracle<C: RpcClient>(
tracing::info!(?url, "Ethereum event oracle is starting");

let oracle = Oracle::<C>::new(
&url,
Either::Right(&url),
sender,
last_processed_block,
DEFAULT_BACKOFF,
Expand All @@ -300,6 +309,75 @@ pub fn run_oracle<C: RpcClient>(
.with_no_cleanup()
}

/// Determine what action to take after attempting to
/// process events contained in an Ethereum block.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum ProcessEventAction {
/// No events could be processed at this time, so we must keep
/// polling for new events.
ContinuePollingEvents,
/// Some error occurred while processing Ethereum events in
/// the current height. We must halt the oracle.
HaltOracle,
/// The current Ethereum block height has been processed.
/// We must advance to the next Ethereum height.
ProceedToNextBlock,
}

impl ProcessEventAction {
/// Check whether the action commands a new block to be processed.
#[inline]
pub fn process_new_block(&self) -> bool {
matches!(self, Self::ProceedToNextBlock)
}
}

impl ProcessEventAction {
/// Handles the requested oracle action, translating it to a format
/// understood by the set of [`Sleep`] abstractions.
fn handle(self) -> ControlFlow<Result<(), ()>, ()> {
match self {
ProcessEventAction::ContinuePollingEvents => {
ControlFlow::Continue(())
}
ProcessEventAction::HaltOracle => ControlFlow::Break(Err(())),
ProcessEventAction::ProceedToNextBlock => {
ControlFlow::Break(Ok(()))
}
}
}
}

/// Tentatively process a batch of Ethereum events.
pub(crate) async fn try_process_eth_events<C: RpcClient>(
oracle: &Oracle<C>,
config: &Config,
next_block_to_process: &ethereum_structs::BlockHeight,
) -> ProcessEventAction {
process_events_in_block(next_block_to_process, oracle, config)
.await
.map_or_else(
|error| {
if oracle.client.may_recover(&error) {
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
ProcessEventAction::ContinuePollingEvents
} else {
tracing::error!(
reason = %error,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ProcessEventAction::HaltOracle
}
},
|()| ProcessEventAction::ProceedToNextBlock,
)
}

/// Given an oracle, watch for new Ethereum events, processing
/// them into Namada native types.
///
Expand Down Expand Up @@ -334,43 +412,8 @@ async fn run_oracle_aux<C: RpcClient>(mut oracle: Oracle<C>) {
);
let res = Sleep { strategy: Constant(oracle.backoff) }.run(|| async {
tokio::select! {
result = process(&oracle, &config, next_block_to_process.clone()) => {
match result {
Ok(()) => {
ControlFlow::Break(Ok(()))
},
Err(
reason @ (
Error::Timeout
| Error::Channel(_, _)
| Error::CheckEvents(_, _, _)
)
) => {
// the oracle is unresponsive, we don't want the test to end
if !C::EXIT_ON_EVENTS_FAILURE
&& matches!(&reason, Error::CheckEvents(_, _, _))
{
tracing::debug!("Allowing the Ethereum oracle to keep running");
return ControlFlow::Continue(());
}
tracing::error!(
%reason,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ControlFlow::Break(Err(()))
}
Err(error) => {
// this is a recoverable error, hence the debug log,
// to avoid spamming info logs
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
ControlFlow::Continue(())
}
}
action = try_process_eth_events(&oracle, &config, &next_block_to_process) => {
action.handle()
},
_ = oracle.sender.closed() => {
tracing::info!(
Expand Down Expand Up @@ -400,10 +443,10 @@ async fn run_oracle_aux<C: RpcClient>(mut oracle: Oracle<C>) {

/// Checks if the given block has any events relating to the bridge, and if so,
/// sends them to the oracle's `sender` channel
async fn process<C: RpcClient>(
async fn process_events_in_block<C: RpcClient>(
block_to_process: &ethereum_structs::BlockHeight,
oracle: &Oracle<C>,
config: &Config,
block_to_process: ethereum_structs::BlockHeight,
) -> Result<(), Error> {
let mut queue: Vec<PendingEvent> = vec![];
let pending = &mut queue;
Expand Down
20 changes: 12 additions & 8 deletions apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub mod event_log {
}
}

#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
pub mod mock_web3_client {
use std::borrow::Cow;
use std::fmt::Debug;
Expand Down Expand Up @@ -102,7 +102,7 @@ pub mod mock_web3_client {
/// reason is for interior mutability.
pub struct Web3Client(Arc<Mutex<Web3ClientInner>>);

/// Command sender for [`Web3`] instances.
/// Command sender for [`TestOracle`] instances.
pub struct Web3Controller(Arc<Mutex<Web3ClientInner>>);

impl Web3Controller {
Expand Down Expand Up @@ -148,8 +148,6 @@ pub mod mock_web3_client {
impl RpcClient for Web3Client {
type Log = ethabi::RawLog;

const EXIT_ON_EVENTS_FAILURE: bool = false;

#[cold]
fn new_client(_: &str) -> Self
where
Expand Down Expand Up @@ -184,14 +182,15 @@ pub mod mock_web3_client {
}
if client.last_block_processed.as_ref() < Some(&block_to_check)
{
client
.blocks_processed
.send(block_to_check.clone())
.unwrap();
_ = client.blocks_processed.send(block_to_check.clone());
client.last_block_processed = Some(block_to_check);
}
Ok(logs)
} else {
tracing::debug!(
"No events to be processed by the Test Ethereum oracle, \
as it has been artificially set as unresponsive"
);
Err(Error::CheckEvents(
ty.into(),
addr,
Expand All @@ -209,6 +208,11 @@ pub mod mock_web3_client {
let height = self.0.lock().unwrap().latest_block_height.clone();
Ok(SyncStatus::AtHeight(height))
}

#[inline(always)]
fn may_recover(&self, _: &Error) -> bool {
true
}
}

impl Web3Client {
Expand Down
3 changes: 3 additions & 0 deletions apps/src/lib/node/ledger/shell/finalize_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,7 @@ mod test_finalize_block {
let (mut shell, _recv, _, _) = setup_with_cfg(SetupCfg {
last_height: 0,
num_validators: 4,
..Default::default()
});

let mut validator_set: BTreeSet<WeightedValidator> =
Expand Down Expand Up @@ -2651,6 +2652,7 @@ mod test_finalize_block {
let (mut shell, _recv, _, _) = setup_with_cfg(SetupCfg {
last_height: 0,
num_validators,
..Default::default()
});
let mut params = read_pos_params(&shell.wl_storage).unwrap();
params.unbonding_len = 4;
Expand Down Expand Up @@ -3029,6 +3031,7 @@ mod test_finalize_block {
let (mut shell, _recv, _, _) = setup_with_cfg(SetupCfg {
last_height: 0,
num_validators,
..Default::default()
});
let mut params = read_pos_params(&shell.wl_storage).unwrap();
params.unbonding_len = 4;
Expand Down
Loading