diff --git a/bridges/modules/substrate/src/lib.rs b/bridges/modules/substrate/src/lib.rs index ca3fe2839a56..ad27627f62ca 100644 --- a/bridges/modules/substrate/src/lib.rs +++ b/bridges/modules/substrate/src/lib.rs @@ -247,6 +247,9 @@ mod tests { type MaximumBlockLength = (); type Version = (); type ModuleToIndex = (); + type AccountData = (); + type OnNewAccount = (); + type OnReapAccount = (); } impl Trait for Test {} diff --git a/bridges/relays/substrate/Cargo.toml b/bridges/relays/substrate/Cargo.toml new file mode 100644 index 000000000000..d3382cedd462 --- /dev/null +++ b/bridges/relays/substrate/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "substrate-bridge" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +async-std = "1.0.1" +clap = "2.3.3" +ctrlc = "3.1.3" +derive_more = "0.99.1" +env_logger = "0.7.1" +futures = "0.3.1" +jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee", features = ["ws"] } +log = "0.4.8" +node-primitives = { version = "2.0.0", git = "https://github.com/paritytech/substrate" } +serde_json = "1.0.41" +sp-core = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git" } +sp-rpc = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git" } +url = "2.1.0" diff --git a/bridges/relays/substrate/README.md b/bridges/relays/substrate/README.md new file mode 100644 index 000000000000..dcbaeaaed5a8 --- /dev/null +++ b/bridges/relays/substrate/README.md @@ -0,0 +1,32 @@ +# Substrate-to-Substrate Bridge Relay + +The bridge relay is a process that connects to running Substrate nodes and sends data over the Substrate-to-Substrate bridge. The process communicates with the nodes over the JSON-RPC interface and reads data from the relays information required by the `bridge` pallet using runtime calls and writes data to the modules by constructing and submitting extrinsics. + +For more details, see the [design document](doc/design.md). + +## Status + +This is a not-in-progress prototype. + +## Running in development + +Run two development Substrate chains: + +```bash +> TMPDIR=(mktemp -d) +> cd $TMPDIR +> substrate build-spec --dev > red-spec.json +> cp red-spec.json blue-spec.json +# Modify the chain spec in an editor so that the genesis hashes of the two chains differ. +# For example, double one of the balances in '$.genesis.runtime.balances.balances'. +> substrate --chain red-spec.json --alice --base-path ./red --port 30343 --ws-port 9954 +> substrate --chain blue-spec.json --alice --base-path ./blue --port 30353 --ws-port 9964 +``` + +Now run the bridge relay: + +``` +> target/release/substrate-bridge --base-path ./relay \ + --rpc-url ws://localhost:9954 \ + --rpc-url ws://localhost:9964 +``` diff --git a/bridges/relays/substrate/src/bridge.rs b/bridges/relays/substrate/src/bridge.rs new file mode 100644 index 000000000000..7a733c5862e5 --- /dev/null +++ b/bridges/relays/substrate/src/bridge.rs @@ -0,0 +1,391 @@ +use crate::error::Error; +use crate::rpc::{self, SubstrateRPC}; +use crate::params::{RPCUrlParam, Params}; + +use futures::{prelude::*, channel::{mpsc, oneshot}, future, select}; +use jsonrpsee::{ + core::client::{RawClientError, RawClientEvent, RawClientRequestId, RawClientSubscription}, + ws::{WsRawClient, WsConnecError, ws_raw_client}, +}; +use node_primitives::{Hash, Header}; +use std::cell::RefCell; +use std::collections::HashMap; +use std::pin::Pin; +use sp_core::Bytes; + +type ChainId = Hash; + +struct BridgeState { + channel: mpsc::Sender, + locally_finalized_head_on_bridged_chain: Header, +} + +struct ChainState { + current_finalized_head: Header, + bridges: HashMap, +} + +enum Event { + SubmitExtrinsic(Bytes), +} + +struct Chain { + url: String, + client: WsRawClient, + sender: mpsc::Sender, + receiver: mpsc::Receiver, + genesis_hash: Hash, + state: ChainState, +} + +async fn init_rpc_connection(url: &RPCUrlParam) -> Result { + let url_str = url.to_string(); + log::debug!("Connecting to {}", url_str); + + // Skip the leading "ws://" and trailing "/". + let url_without_scheme = &url_str[5..(url_str.len() - 1)]; + let mut client = ws_raw_client(url_without_scheme) + .await + .map_err(|err| Error::WsConnectionError(err.to_string()))?; + + let genesis_hash = rpc::genesis_block_hash(&mut client) + .await + .map_err(|e| Error::RPCError(e.to_string()))? + .ok_or_else(|| Error::InvalidChainState(format!( + "chain with RPC URL {} is missing a genesis block hash", + url_str, + )))?; + + let latest_finalized_hash = SubstrateRPC::chain_finalized_head(&mut client) + .await + .map_err(|e| Error::RPCError(e.to_string()))?; + let latest_finalized_header = SubstrateRPC::chain_header( + &mut client, + Some(latest_finalized_hash) + ) + .await + .map_err(|e| Error::RPCError(e.to_string()))? + .ok_or_else(|| Error::InvalidChainState(format!( + "chain {} is missing header for finalized block hash {}", + genesis_hash, latest_finalized_hash + )))?; + + let (sender, receiver) = mpsc::channel(0); + + Ok(Chain { + url: url_str, + client, + sender, + receiver, + genesis_hash, + state: ChainState { + current_finalized_head: latest_finalized_header, + bridges: HashMap::new(), + } + }) +} + +/// Returns IDs of the bridged chains. +async fn read_bridges(chain: &mut Chain, chain_ids: &[Hash]) + -> Result, Error> +{ + // This should make an RPC call to read this information from the bridge pallet state. + // For now, just pretend every chain is bridged to every other chain. + // + // TODO: The correct thing. + Ok( + chain_ids + .iter() + .cloned() + .filter(|&chain_id| chain_id != chain.genesis_hash) + .collect() + ) +} + +pub async fn run_async( + params: Params, + exit: Box + Unpin + Send> +) -> Result<(), Error> +{ + let chains = init_chains(¶ms).await?; + + let (chain_tasks, exit_signals) = chains.into_iter() + .map(|(chain_id, chain_cell)| { + let chain = chain_cell.into_inner(); + let (task_exit_signal, task_exit_receiver) = oneshot::channel(); + let task_exit = Box::new(task_exit_receiver.map(|result| { + result.expect("task_exit_signal is not dropped before send() is called") + })); + let chain_task = async_std::task::spawn(async move { + if let Err(err) = chain_task(chain_id, chain, task_exit).await { + log::error!("Error in task for chain {}: {}", chain_id, err); + } + }); + (chain_task, task_exit_signal) + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + async_std::task::spawn(async move { + exit.await; + for exit_signal in exit_signals { + let _ = exit_signal.send(()); + } + }); + + future::join_all(chain_tasks).await; + Ok(()) +} + +fn initial_next_events<'a>(chains: &'a HashMap>) + -> Vec> + 'a>>> +{ + chains.values() + .map(|chain_cell| async move { + let mut chain = chain_cell.borrow_mut(); + let event = chain.client.next_event() + .await + .map_err(|err| Error::RPCError(err.to_string()))?; + Ok((chain.genesis_hash, event)) + }) + .map(|fut| Box::pin(fut) as Pin>>) + .collect() +} + +async fn next_event<'a>( + next_events: Vec> + 'a>>>, + chains: &'a HashMap>, +) + -> ( + Result<(Hash, RawClientEvent), Error>, + Vec> +'a>>> + ) +{ + let (result, _, mut rest) = future::select_all(next_events).await; + + match result { + Ok((chain_id, _)) => { + let fut = async move { + let chain_cell = chains.get(&chain_id) + .expect("chain must be in the map as a function precondition; qed"); + let mut chain = chain_cell.borrow_mut(); + let event = chain.client.next_event() + .await + .map_err(|err| Error::RPCError(err.to_string()))?; + Ok((chain_id, event)) + }; + rest.push(Box::pin(fut)); + } + Err(ref err) => log::warn!("error in RPC connection with a chain: {}", err), + } + + (result, rest) +} + +async fn init_chains(params: &Params) -> Result>, Error> { + let chains = future::join_all(params.rpc_urls.iter().map(init_rpc_connection)) + .await + .into_iter() + .map(|result| result.map(|chain| (chain.genesis_hash, RefCell::new(chain)))) + .collect::, _>>()?; + + // TODO: Remove when read_bridges is implemented correctly. + let chain_ids = chains.keys() + .cloned() + .collect::>(); + // let chain_ids_slice = chain_ids.as_slice(); + + for (&chain_id, chain_cell) in chains.iter() { + let mut chain = chain_cell.borrow_mut(); + for bridged_chain_id in read_bridges(&mut chain, &chain_ids).await? { + if chain_id == bridged_chain_id { + log::warn!("chain {} has a bridge to itself", chain_id); + continue; + } + + if let Some(bridged_chain_cell) = chains.get(&bridged_chain_id) { + let bridged_chain = bridged_chain_cell.borrow_mut(); + + // TODO: Get this from RPC to runtime API. + let genesis_head = SubstrateRPC::chain_header(&mut chain.client, chain_id) + .await + .map_err(|e| Error::RPCError(e.to_string()))? + .ok_or_else(|| Error::InvalidChainState(format!( + "chain {} is missing a genesis block header", chain_id + )))?; + + let channel = chain.sender.clone(); + chain.state.bridges.insert(bridged_chain_id, BridgeState { + channel, + locally_finalized_head_on_bridged_chain: genesis_head, + }); + + // The conditional ensures that we don't log twice per pair of chains. + if chain_id.as_ref() < bridged_chain_id.as_ref() { + log::info!("initialized bridge between {} and {}", chain_id, bridged_chain_id); + } + } + } + } + + Ok(chains) +} + +async fn setup_subscriptions(chain: &mut Chain) + -> Result<(RawClientRequestId, RawClientRequestId), RawClientError> +{ + let new_heads_subscription_id = chain.client + .start_subscription( + "chain_subscribeNewHeads", + jsonrpsee::core::common::Params::None, + ) + .await + .map_err(RawClientError::Inner)?; + + let finalized_heads_subscription_id = chain.client + .start_subscription( + "chain_subscribeFinalizedHeads", + jsonrpsee::core::common::Params::None, + ) + .await + .map_err(RawClientError::Inner)?; + + let new_heads_subscription = + chain.client.subscription_by_id(new_heads_subscription_id) + .expect("subscription_id was returned from start_subscription above; qed"); + let new_heads_subscription = match new_heads_subscription { + RawClientSubscription::Active(_) => {} + RawClientSubscription::Pending(subscription) => { + subscription.wait().await?; + } + }; + + let finalized_heads_subscription = + chain.client.subscription_by_id(finalized_heads_subscription_id) + .expect("subscription_id was returned from start_subscription above; qed"); + let finalized_heads_subscription = match finalized_heads_subscription { + RawClientSubscription::Active(subscription) => {} + RawClientSubscription::Pending(subscription) => { + subscription.wait().await?; + } + }; + + Ok((new_heads_subscription_id, finalized_heads_subscription_id)) +} + +async fn handle_rpc_event( + chain_id: ChainId, + chain: &mut Chain, + event: RawClientEvent, + new_heads_subscription_id: RawClientRequestId, + finalized_heads_subscription_id: RawClientRequestId, +) -> Result<(), Error> +{ + match event { + RawClientEvent::SubscriptionNotif { request_id, result } => + if request_id == new_heads_subscription_id { + let header: Header = serde_json::from_value(result) + .map_err(Error::SerializationError)?; + log::info!("Received new head {:?} on chain {}", header, chain_id); + } else if request_id == finalized_heads_subscription_id { + let header: Header = serde_json::from_value(result) + .map_err(Error::SerializationError)?; + log::info!("Received finalized head {:?} on chain {}", header, chain_id); + + // let old_finalized_head = chain_state.current_finalized_head; + chain.state.current_finalized_head = header; + for (bridged_chain_id, bridged_chain) in chain.state.bridges.iter_mut() { + if bridged_chain.locally_finalized_head_on_bridged_chain.number < + chain.state.current_finalized_head.number { + // Craft and submit an extrinsic over RPC + log::info!("Sending command to submit extrinsic to chain {}", chain_id); + let mut send_event = bridged_chain.channel + .send(Event::SubmitExtrinsic(Bytes(Vec::new()))) + .fuse(); + + // Continue processing events from other chain tasks while waiting to send + // event to other chain task in order to prevent deadlocks. + loop { + select! { + result = send_event => { + result.map_err(Error::ChannelError)?; + break; + } + event = chain.receiver.next().fuse() => { + let event = event + .expect("stream will never close as the chain has an mpsc Sender"); + handle_bridge_event(chain_id, &mut chain.client, event) + .await?; + } + // TODO: exit + } + } + } + } + } else { + return Err(Error::RPCError(format!( + "unexpected subscription response with request ID {:?}", request_id + ))); + }, + _ => return Err(Error::RPCError(format!( + "unexpected RPC event from chain {}: {:?}", chain_id, event + ))), + } + Ok(()) +} + +// Let's say this never sends over a channel (ie. cannot block on another task). +async fn handle_bridge_event( + chain_id: ChainId, + rpc_client: &mut WsRawClient, + event: Event, +) -> Result<(), Error> +{ + match event { + Event::SubmitExtrinsic(data) => { + log::info!("Submitting extrinsic to chain {}", chain_id); + if let Err(err) = SubstrateRPC::author_submit_extrinsic(rpc_client, data).await { + log::error!("failed to submit extrinsic: {}", err); + } + } + } + Ok(()) +} + +async fn chain_task( + chain_id: ChainId, + mut chain: Chain, + exit: impl Future + Unpin + Send +) -> Result<(), Error> +{ + let (new_heads_subscription_id, finalized_heads_subscription_id) = + setup_subscriptions(&mut chain) + .await + .map_err(|e| Error::RPCError(e.to_string()))?; + + let mut exit = exit.fuse(); + loop { + select! { + result = chain.client.next_event().fuse() => { + let event = result.map_err(|e| Error::RPCError(e.to_string()))?; + handle_rpc_event( + chain_id, + &mut chain, + event, + new_heads_subscription_id, + finalized_heads_subscription_id, + ).await?; + } + event = chain.receiver.next().fuse() => { + let event = event + .expect("stream will never close as the chain has an mpsc Sender"); + handle_bridge_event(chain_id, &mut chain.client, event) + .await?; + } + _ = exit => { + log::debug!("Received exit signal, shutting down task for chain {}", chain_id); + break; + } + } + } + Ok(()) +} diff --git a/bridges/relays/substrate/src/error.rs b/bridges/relays/substrate/src/error.rs new file mode 100644 index 000000000000..a612c84707d6 --- /dev/null +++ b/bridges/relays/substrate/src/error.rs @@ -0,0 +1,31 @@ +use futures::channel::mpsc; + +#[derive(Debug, derive_more::Display)] +pub enum Error { + #[display(fmt = "invalid RPC URL: {}", _0)] + UrlError(String), + #[display(fmt = "RPC response indicates invalid chain state: {}", _0)] + InvalidChainState(String), + #[display(fmt = "could not make RPC call: {}", _0)] + RPCError(String), + #[display(fmt = "could not connect to RPC URL: {}", _0)] + WsConnectionError(String), + #[display(fmt = "unexpected client event from RPC URL {}: {:?}", _0, _1)] + UnexpectedClientEvent(String, String), + #[display(fmt = "serialization error: {}", _0)] + SerializationError(serde_json::error::Error), + #[display(fmt = "invalid event received from bridged chain: {}", _0)] + InvalidBridgeEvent(String), + #[display(fmt = "error sending over MPSC channel: {}", _0)] + ChannelError(mpsc::SendError), +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Error::SerializationError(err) => Some(err), + Error::ChannelError(err) => Some(err), + _ => None, + } + } +} diff --git a/bridges/relays/substrate/src/main.rs b/bridges/relays/substrate/src/main.rs new file mode 100644 index 000000000000..bfa183cb2e50 --- /dev/null +++ b/bridges/relays/substrate/src/main.rs @@ -0,0 +1,80 @@ +mod bridge; +mod error; +mod params; +mod rpc; + +use bridge::run_async; +use params::{Params, RPCUrlParam}; + +use clap::{App, Arg, value_t, values_t}; +use futures::{prelude::*, channel}; +use std::cell::Cell; +use std::process; + +fn main() { + let params = parse_args(); + env_logger::init(); + let exit = setup_exit_handler(); + + let result = async_std::task::block_on(async move { + run_async(params, exit).await + }); + if let Err(err) = result { + log::error!("{}", err); + process::exit(1); + } +} + +fn parse_args() -> Params { + let matches = App::new("substrate-bridge") + .version("1.0") + .author("Parity Technologies") + .about("Bridges Substrates, duh") + .arg( + Arg::with_name("base-path") + .long("base-path") + .value_name("DIRECTORY") + .required(true) + .help("Sets the base path") + .takes_value(true), + ) + .arg( + Arg::with_name("rpc-url") + .long("rpc-url") + .value_name("HOST[:PORT]") + .help("The URL of a bridged Substrate node") + .takes_value(true) + .multiple(true) + ) + .get_matches(); + + let base_path = value_t!(matches, "base-path", String) + .unwrap_or_else(|e| e.exit()); + let rpc_urls = values_t!(matches, "rpc-url", RPCUrlParam) + .unwrap_or_else(|e| e.exit()); + + Params { + base_path, + rpc_urls, + } +} + +fn setup_exit_handler() -> Box + Unpin + Send> { + let (exit_sender, exit_receiver) = channel::oneshot::channel(); + let exit_sender = Cell::new(Some(exit_sender)); + + ctrlc::set_handler(move || { + if let Some(exit_sender) = exit_sender.take() { + if let Err(()) = exit_sender.send(()) { + log::warn!("failed to send exit signal"); + } + } + }) + .expect("must be able to set Ctrl-C handler"); + + Box::new(exit_receiver.map(|result| { + result.expect( + "exit_sender cannot be dropped as it is moved into a globally-referenced closure" + ) + })) +} diff --git a/bridges/relays/substrate/src/params.rs b/bridges/relays/substrate/src/params.rs new file mode 100644 index 000000000000..019581eb8c4b --- /dev/null +++ b/bridges/relays/substrate/src/params.rs @@ -0,0 +1,75 @@ +use crate::error::Error; + +use url::Url; +use std::str::FromStr; + +const DEFAULT_WS_PORT: u16 = 9944; + +#[derive(Debug, Clone)] +pub struct Params { + pub base_path: String, + pub rpc_urls: Vec, +} + +#[derive(Debug, Clone)] +pub struct RPCUrlParam { + url: Url, +} + +impl ToString for RPCUrlParam { + fn to_string(&self) -> String { + self.url.to_string() + } +} + +impl FromStr for RPCUrlParam { + type Err = Error; + + fn from_str(url_str: &str) -> Result { + let mut url = Url::parse(url_str) + .map_err(|e| Error::UrlError(format!("could not parse {}: {}", url_str, e)))?; + + if url.scheme() != "ws" { + return Err(Error::UrlError(format!("must have scheme ws, found {}", url.scheme()))); + } + + if url.port().is_none() { + url.set_port(Some(DEFAULT_WS_PORT)) + .expect("the scheme is checked above to be ws; qed"); + } + + if url.path() != "/" { + return Err(Error::UrlError(format!("cannot have a path, found {}", url.path()))); + } + if let Some(query) = url.query() { + return Err(Error::UrlError(format!("cannot have a query, found {}", query))); + } + if let Some(fragment) = url.fragment() { + return Err(Error::UrlError(format!("cannot have a fragment, found {}", fragment))); + } + + Ok(RPCUrlParam { url }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rpc_url_from_str() { + assert_eq!( + RPCUrlParam::from_str("ws://127.0.0.1").unwrap().to_string(), + "ws://127.0.0.1:9944/" + ); + assert_eq!( + RPCUrlParam::from_str("ws://127.0.0.1/").unwrap().to_string(), + "ws://127.0.0.1:9944/" + ); + assert_eq!( + RPCUrlParam::from_str("ws://127.0.0.1:4499").unwrap().to_string(), + "ws://127.0.0.1:4499/" + ); + assert!(RPCUrlParam::from_str("http://127.0.0.1").is_err()); + } +} diff --git a/bridges/relays/substrate/src/rpc.rs b/bridges/relays/substrate/src/rpc.rs new file mode 100644 index 000000000000..575419e3c5f0 --- /dev/null +++ b/bridges/relays/substrate/src/rpc.rs @@ -0,0 +1,29 @@ +use jsonrpsee::core::client::{RawClient, RawClientError, TransportClient}; +use node_primitives::{BlockNumber, Hash, Header}; +use sp_core::Bytes; +use sp_rpc::number::NumberOrHex; + +jsonrpsee::rpc_api! { + pub SubstrateRPC { + #[rpc(method = "author_submitExtrinsic", positional_params)] + fn author_submit_extrinsic(extrinsic: Bytes) -> Hash; + + #[rpc(method = "chain_getFinalizedHead")] + fn chain_finalized_head() -> Hash; + + #[rpc(method = "chain_getBlockHash", positional_params)] + fn chain_block_hash(id: Option>) -> Option; + + #[rpc(method = "chain_getHeader", positional_params)] + fn chain_header(hash: Option) -> Option
; + + #[rpc(positional_params)] + fn state_call(name: String, bytes: Bytes, hash: Option) -> Bytes; + } +} + +pub async fn genesis_block_hash(client: &mut RawClient) + -> Result, RawClientError> +{ + SubstrateRPC::chain_block_hash(client, Some(NumberOrHex::Number(0))).await +}