Skip to content

Commit

Permalink
Merge branch 'tiago/phase-out-try-halt' (#1953)
Browse files Browse the repository at this point in the history
  • Loading branch information
sug0 committed Oct 13, 2023
2 parents 3cd3970 + af1fa0e commit 7ae503f
Show file tree
Hide file tree
Showing 15 changed files with 480 additions and 542 deletions.
2 changes: 2 additions & 0 deletions .changelog/unreleased/SDK/1953-phase-out-try-halt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Phase out Halt abstractions
([\#1953](https://github.com/anoma/namada/pull/1953))
12 changes: 9 additions & 3 deletions apps/src/lib/cli/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use namada::tendermint_rpc::HttpClient;
use namada::types::control_flow::Halt;
use namada::types::io::Io;
use namada_sdk::error::Error;
use namada_sdk::queries::Client;
use namada_sdk::rpc::wait_until_node_is_synched;
use tendermint_config::net::Address as TendermintAddress;
Expand All @@ -11,7 +11,10 @@ use crate::client::utils;
#[async_trait::async_trait(?Send)]
pub trait CliClient: Client + Sync {
fn from_tendermint_address(address: &mut TendermintAddress) -> Self;
async fn wait_until_node_is_synced(&self, io: &impl Io) -> Halt<()>;
async fn wait_until_node_is_synced(
&self,
io: &impl Io,
) -> Result<(), Error>;
}

#[async_trait::async_trait(?Send)]
Expand All @@ -20,7 +23,10 @@ impl CliClient for HttpClient {
HttpClient::new(utils::take_config_address(address)).unwrap()
}

async fn wait_until_node_is_synced(&self, io: &impl Io) -> Halt<()> {
async fn wait_until_node_is_synced(
&self,
io: &impl Io,
) -> Result<(), Error> {
wait_until_node_is_synched(self, io).await
}
}
Expand Down
197 changes: 39 additions & 158 deletions apps/src/lib/cli/client.rs

Large diffs are not rendered by default.

85 changes: 21 additions & 64 deletions apps/src/lib/cli/relayer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::sync::Arc;

use color_eyre::eyre::{eyre, Report, Result};
use color_eyre::eyre::Result;
use namada::eth_bridge::ethers::providers::{Http, Provider};
use namada::types::control_flow::ProceedOrElse;
use namada::types::io::Io;
use namada_sdk::eth_bridge::{bridge_pool, validator_set};

Expand All @@ -11,10 +10,6 @@ use crate::cli::api::{CliApi, CliClient};
use crate::cli::args::{CliToSdk, CliToSdkCtxless};
use crate::cli::cmds::*;

fn error() -> Report {
eyre!("Fatal error")
}

impl CliApi {
pub async fn handle_relayer_command<C>(
client: Option<C>,
Expand All @@ -36,15 +31,10 @@ impl CliApi {
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let args = args.to_sdk(&mut ctx);
let namada = ctx.to_sdk(&client, io);
bridge_pool::recommend_batch(&namada, args)
.await
.proceed_or_else(error)?;
bridge_pool::recommend_batch(&namada, args).await?;
}
}
}
Expand All @@ -57,25 +47,17 @@ impl CliApi {
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let args = args.to_sdk_ctxless();
bridge_pool::construct_proof(&client, io, args)
.await
.proceed_or_else(error)?;
bridge_pool::construct_proof(&client, io, args).await?;
}
EthBridgePoolWithoutCtx::RelayProof(RelayProof(mut args)) => {
let client = client.unwrap_or_else(|| {
C::from_tendermint_address(
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let eth_client = Arc::new(
Provider::<Http>::try_from(&args.eth_rpc_endpoint)
.unwrap(),
Expand All @@ -84,46 +66,34 @@ impl CliApi {
bridge_pool::relay_bridge_pool_proof(
eth_client, &client, io, args,
)
.await
.proceed_or_else(error)?;
.await?;
}
EthBridgePoolWithoutCtx::QueryPool(QueryEthBridgePool(
mut query,
)) => {
let client = client.unwrap_or_else(|| {
C::from_tendermint_address(&mut query.ledger_address)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
bridge_pool::query_bridge_pool(&client, io).await;
client.wait_until_node_is_synced(io).await?;
bridge_pool::query_bridge_pool(&client, io).await?;
}
EthBridgePoolWithoutCtx::QuerySigned(
QuerySignedBridgePool(mut query),
) => {
let client = client.unwrap_or_else(|| {
C::from_tendermint_address(&mut query.ledger_address)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
bridge_pool::query_signed_bridge_pool(&client, io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
bridge_pool::query_signed_bridge_pool(&client, io).await?;
}
EthBridgePoolWithoutCtx::QueryRelays(QueryRelayProgress(
mut query,
)) => {
let client = client.unwrap_or_else(|| {
C::from_tendermint_address(&mut query.ledger_address)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
bridge_pool::query_relay_progress(&client, io).await;
client.wait_until_node_is_synced(io).await?;
bridge_pool::query_relay_progress(&client, io).await?;
}
},
cli::NamadaRelayer::ValidatorSet(sub) => match sub {
Expand All @@ -135,15 +105,12 @@ impl CliApi {
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let args = args.to_sdk_ctxless();
validator_set::query_bridge_validator_set(
&client, io, args,
)
.await;
.await?;
}
ValidatorSet::GovernanceValidatorSet(
GovernanceValidatorSet(mut args),
Expand All @@ -153,15 +120,12 @@ impl CliApi {
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let args = args.to_sdk_ctxless();
validator_set::query_governnace_validator_set(
&client, io, args,
)
.await;
.await?;
}
ValidatorSet::ValidatorSetProof(ValidatorSetProof(
mut args,
Expand All @@ -171,15 +135,12 @@ impl CliApi {
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let args = args.to_sdk_ctxless();
validator_set::query_validator_set_update_proof(
&client, io, args,
)
.await;
.await?;
}
ValidatorSet::ValidatorSetUpdateRelay(
ValidatorSetUpdateRelay(mut args),
Expand All @@ -189,10 +150,7 @@ impl CliApi {
&mut args.query.ledger_address,
)
});
client
.wait_until_node_is_synced(io)
.await
.proceed_or_else(error)?;
client.wait_until_node_is_synced(io).await?;
let eth_client = Arc::new(
Provider::<Http>::try_from(&args.eth_rpc_endpoint)
.unwrap(),
Expand All @@ -201,8 +159,7 @@ impl CliApi {
validator_set::relay_validator_set_update(
eth_client, &client, io, args,
)
.await
.proceed_or_else(error)?;
.await?;
}
},
}
Expand Down
3 changes: 1 addition & 2 deletions apps/src/lib/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use namada::ledger::queries::RPC;
use namada::ledger::storage::ConversionState;
use namada::proof_of_stake::types::{ValidatorState, WeightedValidator};
use namada::types::address::{masp, Address};
use namada::types::control_flow::ProceedOrElse;
use namada::types::hash::Hash;
use namada::types::io::Io;
use namada::types::key::*;
Expand Down Expand Up @@ -67,7 +66,7 @@ pub async fn query_tx_status<'a>(
) -> Event {
rpc::query_tx_status(namada, status, deadline)
.await
.proceed()
.unwrap()
}

/// Query and print the epoch of the last committed block
Expand Down
11 changes: 6 additions & 5 deletions apps/src/lib/node/ledger/shell/testing/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::ops::ControlFlow;

use clap::Command as App;
use eyre::Report;
use namada::types::control_flow::Halt;
use namada::types::io::Io;
use namada_sdk::error::Error as SdkError;
use tendermint_config::net::Address as TendermintAddress;

use super::node::MockNode;
Expand Down Expand Up @@ -98,7 +96,10 @@ impl<'a> CliClient for &'a MockNode {
unreachable!("MockNode should always be instantiated at test start.")
}

async fn wait_until_node_is_synced(&self, _io: &impl Io) -> Halt<()> {
ControlFlow::Continue(())
async fn wait_until_node_is_synced(
&self,
_io: &impl Io,
) -> Result<(), SdkError> {
Ok(())
}
}
104 changes: 0 additions & 104 deletions sdk/src/control_flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
pub mod time;

use std::future::Future;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -12,109 +11,6 @@ use futures::future::FutureExt;
#[cfg(any(unix, windows))]
use tokio::sync::oneshot;

/// A [`ControlFlow`] to control the halt status
/// of some execution context.
///
/// No return values are assumed to exist.
pub type Halt<T> = ControlFlow<(), T>;

/// Halt all execution.
pub const fn halt<T>() -> Halt<T> {
ControlFlow::Break(())
}

/// Proceed execution.
pub const fn proceed<T>(value: T) -> Halt<T> {
ControlFlow::Continue(value)
}

/// Convert from [`Halt`] to [`Result`].
#[allow(missing_docs)]
pub trait ProceedOrElse<T> {
fn proceed_or_else<F, E>(self, error: F) -> Result<T, E>
where
Self: Sized,
F: FnOnce() -> E;

#[inline]
fn proceed_or<E>(self, error: E) -> Result<T, E>
where
Self: Sized,
{
self.proceed_or_else(move || error)
}

#[inline]
fn proceed(self) -> T
where
Self: Sized,
{
self.proceed_or(()).expect("Halted execution")
}
}

impl<T> ProceedOrElse<T> for Halt<T> {
#[inline]
fn proceed_or_else<F, E>(self, error: F) -> Result<T, E>
where
Self: Sized,
F: FnOnce() -> E,
{
match self {
ControlFlow::Continue(x) => Ok(x),
ControlFlow::Break(()) => Err(error()),
}
}
}

/// Halting abstraction to obtain [`ControlFlow`] actions.
pub trait TryHalt<T, E> {
/// Possibly exit from some context, if we encounter an
/// error. We may recover from said error.
fn try_halt_or_recover<F>(self, handle_err: F) -> Halt<T>
where
F: FnMut(E) -> Halt<T>;

/// Exit from some context, if we encounter an error.
#[inline]
fn try_halt<F>(self, mut handle_err: F) -> Halt<T>
where
Self: Sized,
F: FnMut(E),
{
self.try_halt_or_recover(|e| {
handle_err(e);
halt()
})
}
}

impl<T, E> TryHalt<T, E> for Result<T, E> {
#[inline]
fn try_halt_or_recover<F>(self, mut handle_err: F) -> Halt<T>
where
F: FnMut(E) -> Halt<T>,
{
match self {
Ok(x) => proceed(x),
Err(e) => handle_err(e),
}
}
}

impl<L, R> TryHalt<R, L> for itertools::Either<L, R> {
#[inline]
fn try_halt_or_recover<F>(self, mut handle_err: F) -> Halt<R>
where
F: FnMut(L) -> Halt<R>,
{
match self {
itertools::Either::Right(x) => proceed(x),
itertools::Either::Left(e) => handle_err(e),
}
}
}

/// A shutdown signal receiver.
pub struct ShutdownSignal {
#[cfg(not(any(unix, windows)))]
Expand Down
Loading

0 comments on commit 7ae503f

Please sign in to comment.