diff --git a/Cargo.lock b/Cargo.lock index 7ea4e3e5c048e..397846693e907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4503,7 +4503,6 @@ dependencies = [ "platforms", "rand 0.8.5", "regex", - "remote-externalities", "sc-authority-discovery", "sc-basic-authorship", "sc-block-builder", @@ -4551,6 +4550,7 @@ dependencies = [ "sp-trie", "substrate-build-script-utils", "substrate-frame-cli", + "substrate-rpc-client", "tempfile", "tokio", "try-runtime-cli", @@ -7278,7 +7278,6 @@ version = "0.10.0-dev" dependencies = [ "env_logger", "frame-support", - "jsonrpsee", "log", "pallet-elections-phragmen", "parity-scale-codec", @@ -7288,6 +7287,7 @@ dependencies = [ "sp-io", "sp-runtime", "sp-version", + "substrate-rpc-client", "tokio", ] @@ -10349,6 +10349,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "substrate-rpc-client" +version = "0.10.0-dev" +dependencies = [ + "async-trait", + "jsonrpsee", + "log", + "sc-rpc-api", + "serde", + "sp-core", + "sp-runtime", + "tokio", +] + [[package]] name = "substrate-state-trie-migration-rpc" version = "4.0.0-dev" @@ -11028,7 +11042,6 @@ version = "0.10.0-dev" dependencies = [ "clap 4.0.11", "frame-try-runtime", - "jsonrpsee", "log", "parity-scale-codec", "remote-externalities", @@ -11045,6 +11058,7 @@ dependencies = [ "sp-state-machine", "sp-version", "sp-weights", + "substrate-rpc-client", "tokio", "zstd", ] diff --git a/Cargo.toml b/Cargo.toml index e203cbbee7e0d..d3c801fc2c7be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -232,6 +232,7 @@ members = [ "utils/frame/rpc/system", "utils/frame/generate-bags", "utils/frame/generate-bags/node-runtime", + "utils/frame/rpc/client", "utils/prometheus", "utils/wasm-builder", ] diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index 25e7ef095ed3e..4bf991f49320c 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -132,7 +132,7 @@ soketto = "0.7.1" criterion = { version = "0.3.5", features = ["async_tokio"] } tokio = { version = "1.17.0", features = ["macros", "time", "parking_lot"] } wait-timeout = "0.2" -remote-externalities = { path = "../../../utils/frame/remote-externalities" } +substrate-rpc-client = { path = "../../../utils/frame/rpc/client" } pallet-timestamp = { version = "4.0.0-dev", path = "../../../frame/timestamp" } [build-dependencies] diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index 3b83f4339611d..358c09779d59a 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -23,8 +23,7 @@ use nix::{ sys::signal::{kill, Signal::SIGINT}, unistd::Pid, }; -use node_primitives::Block; -use remote_externalities::rpc_api::RpcService; +use node_primitives::{Hash, Header}; use std::{ io::{BufRead, BufReader, Read}, ops::{Deref, DerefMut}, @@ -69,12 +68,14 @@ pub async fn wait_n_finalized_blocks( /// Wait for at least n blocks to be finalized from a specified node pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) { + use substrate_rpc_client::{ws_client, ChainApi}; + let mut built_blocks = std::collections::HashSet::new(); let mut interval = tokio::time::interval(Duration::from_secs(2)); - let rpc_service = RpcService::new(url, false).await.unwrap(); + let rpc = ws_client(url).await.unwrap(); loop { - if let Ok(block) = rpc_service.get_finalized_head::().await { + if let Ok(block) = ChainApi::<(), Hash, Header, ()>::finalized_head(&rpc).await { built_blocks.insert(block); if built_blocks.len() > n { break diff --git a/frame/bags-list/remote-tests/src/migration.rs b/frame/bags-list/remote-tests/src/migration.rs index 675dfbe072670..b013472b4c90e 100644 --- a/frame/bags-list/remote-tests/src/migration.rs +++ b/frame/bags-list/remote-tests/src/migration.rs @@ -24,14 +24,15 @@ use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; /// Test voter bags migration. `currency_unit` is the number of planks per the the runtimes `UNITS` /// (i.e. number of decimal places per DOT, KSM etc) -pub async fn execute< - Runtime: RuntimeT, - Block: BlockT + DeserializeOwned, ->( +pub async fn execute( currency_unit: u64, currency_name: &'static str, ws_url: String, -) { +) where + Runtime: RuntimeT, + Block: BlockT, + Block::Header: DeserializeOwned, +{ let mut ext = Builder::::new() .mode(Mode::Online(OnlineConfig { transport: ws_url.to_string().into(), diff --git a/frame/bags-list/remote-tests/src/snapshot.rs b/frame/bags-list/remote-tests/src/snapshot.rs index 655de10c4af9b..cfe065924bd92 100644 --- a/frame/bags-list/remote-tests/src/snapshot.rs +++ b/frame/bags-list/remote-tests/src/snapshot.rs @@ -22,14 +22,12 @@ use remote_externalities::{Builder, Mode, OnlineConfig}; use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; /// Execute create a snapshot from pallet-staking. -pub async fn execute< +pub async fn execute(voter_limit: Option, currency_unit: u64, ws_url: String) +where Runtime: crate::RuntimeT, - Block: BlockT + DeserializeOwned, ->( - voter_limit: Option, - currency_unit: u64, - ws_url: String, -) { + Block: BlockT, + Block::Header: DeserializeOwned, +{ use frame_support::storage::generator::StorageMap; let mut ext = Builder::::new() diff --git a/frame/bags-list/remote-tests/src/try_state.rs b/frame/bags-list/remote-tests/src/try_state.rs index 9817ef4ceb9e4..d3fb63f045a64 100644 --- a/frame/bags-list/remote-tests/src/try_state.rs +++ b/frame/bags-list/remote-tests/src/try_state.rs @@ -25,14 +25,15 @@ use remote_externalities::{Builder, Mode, OnlineConfig}; use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; /// Execute the sanity check of the bags-list. -pub async fn execute< - Runtime: crate::RuntimeT, - Block: BlockT + DeserializeOwned, ->( +pub async fn execute( currency_unit: u64, currency_name: &'static str, ws_url: String, -) { +) where + Runtime: crate::RuntimeT, + Block: BlockT, + Block::Header: DeserializeOwned, +{ let mut ext = Builder::::new() .mode(Mode::Online(OnlineConfig { transport: ws_url.to_string().into(), diff --git a/frame/state-trie-migration/src/lib.rs b/frame/state-trie-migration/src/lib.rs index 3f81d3c211a95..5255d4f6f3800 100644 --- a/frame/state-trie-migration/src/lib.rs +++ b/frame/state-trie-migration/src/lib.rs @@ -1640,13 +1640,12 @@ pub(crate) mod remote_tests { /// /// This will print some very useful statistics, make sure [`crate::LOG_TARGET`] is enabled. #[allow(dead_code)] - pub(crate) async fn run_with_limits< + pub(crate) async fn run_with_limits(limits: MigrationLimits, mode: Mode) + where Runtime: crate::Config, - Block: BlockT + serde::de::DeserializeOwned, - >( - limits: MigrationLimits, - mode: Mode, - ) { + Block: BlockT, + Block::Header: serde::de::DeserializeOwned, + { let mut ext = remote_externalities::Builder::::new() .mode(mode) .state_version(sp_core::storage::StateVersion::V0) diff --git a/utils/frame/remote-externalities/Cargo.toml b/utils/frame/remote-externalities/Cargo.toml index 3121157df68d8..3d7471bf4d680 100644 --- a/utils/frame/remote-externalities/Cargo.toml +++ b/utils/frame/remote-externalities/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" license = "Apache-2.0" homepage = "https://substrate.io" repository = "https://github.com/paritytech/substrate/" -description = "An externalities provided environemnt that can load itself from remote nodes or cache files" +description = "An externalities provided environment that can load itself from remote nodes or cached files" readme = "README.md" [package.metadata.docs.rs] @@ -15,7 +15,6 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { package = "parity-scale-codec", version = "3.0.0" } env_logger = "0.9" -jsonrpsee = { version = "0.15.1", features = ["ws-client", "macros"] } log = "0.4.17" serde = "1.0.136" serde_json = "1.0" @@ -24,6 +23,7 @@ sp-core = { version = "6.0.0", path = "../../../primitives/core" } sp-io = { version = "6.0.0", path = "../../../primitives/io" } sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } sp-version = { version = "5.0.0", path = "../../../primitives/version" } +substrate-rpc-client = { path = "../rpc/client" } [dev-dependencies] tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread"] } diff --git a/utils/frame/remote-externalities/src/lib.rs b/utils/frame/remote-externalities/src/lib.rs index 83481e745f5ee..86cfc767bf3b5 100644 --- a/utils/frame/remote-externalities/src/lib.rs +++ b/utils/frame/remote-externalities/src/lib.rs @@ -22,13 +22,6 @@ use codec::{Decode, Encode}; -use jsonrpsee::{ - core::{client::ClientT, Error as RpcError}, - proc_macros::rpc, - rpc_params, - ws_client::{WsClient, WsClientBuilder}, -}; - use log::*; use serde::de::DeserializeOwned; use sp_core::{ @@ -46,8 +39,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; - -pub mod rpc_api; +use substrate_rpc_client::{rpc_params, ws_client, ChainApi, ClientT, StateApi, WsClient}; type KeyValue = (StorageKey, StorageData); type TopKeyValues = Vec; @@ -58,40 +50,6 @@ const DEFAULT_TARGET: &str = "wss://rpc.polkadot.io:443"; const BATCH_SIZE: usize = 1000; const PAGE: u32 = 1000; -#[rpc(client)] -pub trait RpcApi { - #[method(name = "childstate_getKeys")] - fn child_get_keys( - &self, - child_key: PrefixedStorageKey, - prefix: StorageKey, - hash: Option, - ) -> Result, RpcError>; - - #[method(name = "childstate_getStorage")] - fn child_get_storage( - &self, - child_key: PrefixedStorageKey, - prefix: StorageKey, - hash: Option, - ) -> Result; - - #[method(name = "state_getStorage")] - fn get_storage(&self, prefix: StorageKey, hash: Option) -> Result; - - #[method(name = "state_getKeysPaged")] - fn get_keys_paged( - &self, - prefix: Option, - count: u32, - start_key: Option, - hash: Option, - ) -> Result, RpcError>; - - #[method(name = "chain_getFinalizedHead")] - fn finalized_head(&self) -> Result; -} - /// The execution mode. #[derive(Clone)] pub enum Mode { @@ -140,14 +98,10 @@ impl Transport { if let Self::Uri(uri) = self { log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri); - let ws_client = WsClientBuilder::default() - .max_request_body_size(u32::MAX) - .build(&uri) - .await - .map_err(|e| { - log::error!(target: LOG_TARGET, "error: {:?}", e); - "failed to build ws client" - })?; + let ws_client = ws_client(uri).await.map_err(|e| { + log::error!(target: LOG_TARGET, "error: {:?}", e); + "failed to build ws client" + })?; *self = Self::RemoteClient(Arc::new(ws_client)) } @@ -258,7 +212,7 @@ pub struct Builder { // NOTE: ideally we would use `DefaultNoBound` here, but not worth bringing in frame-support for // that. -impl Default for Builder { +impl Default for Builder { fn default() -> Self { Self { mode: Default::default(), @@ -272,7 +226,7 @@ impl Default for Builder { } // Mode methods -impl Builder { +impl Builder { fn as_online(&self) -> &OnlineConfig { match &self.mode { Mode::Online(config) => config, @@ -291,26 +245,38 @@ impl Builder { } // RPC methods -impl Builder { +impl Builder +where + B::Hash: DeserializeOwned, + B::Header: DeserializeOwned, +{ async fn rpc_get_storage( &self, key: StorageKey, maybe_at: Option, ) -> Result { trace!(target: LOG_TARGET, "rpc: get_storage"); - self.as_online().rpc_client().get_storage(key, maybe_at).await.map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); - "rpc get_storage failed." - }) + match self.as_online().rpc_client().storage(key, maybe_at).await { + Ok(Some(res)) => Ok(res), + Ok(None) => Err("get_storage not found"), + Err(e) => { + error!(target: LOG_TARGET, "Error = {:?}", e); + Err("rpc get_storage failed.") + }, + } } /// Get the latest finalized head. async fn rpc_get_head(&self) -> Result { trace!(target: LOG_TARGET, "rpc: finalized_head"); - self.as_online().rpc_client().finalized_head().await.map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); - "rpc finalized_head failed." - }) + + // sadly this pretty much unreadable... + ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client()) + .await + .map_err(|e| { + error!(target: LOG_TARGET, "Error = {:?}", e); + "rpc finalized_head failed." + }) } /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. @@ -325,7 +291,7 @@ impl Builder { let page = self .as_online() .rpc_client() - .get_keys_paged(Some(prefix.clone()), PAGE, last_key.clone(), Some(at)) + .storage_keys_paged(Some(prefix.clone()), PAGE, last_key.clone(), Some(at)) .await .map_err(|e| { error!(target: LOG_TARGET, "Error = {:?}", e); @@ -471,19 +437,19 @@ impl Builder { child_prefix: StorageKey, at: B::Hash, ) -> Result, &'static str> { - let child_keys = self - .as_online() - .rpc_client() - .child_get_keys( - PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()), - child_prefix, - Some(at), - ) - .await - .map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); - "rpc child_get_keys failed." - })?; + // This is deprecated and will generate a warning which causes the CI to fail. + #[allow(warnings)] + let child_keys = substrate_rpc_client::ChildStateApi::storage_keys( + self.as_online().rpc_client(), + PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()), + child_prefix, + Some(at), + ) + .await + .map_err(|e| { + error!(target: LOG_TARGET, "Error = {:?}", e); + "rpc child_get_keys failed." + })?; debug!( target: LOG_TARGET, @@ -497,7 +463,11 @@ impl Builder { } // Internal methods -impl Builder { +impl Builder +where + B::Hash: DeserializeOwned, + B::Header: DeserializeOwned, +{ /// Save the given data to the top keys snapshot. fn save_top_snapshot(&self, data: &[KeyValue], path: &PathBuf) -> Result<(), &'static str> { let mut path = path.clone(); @@ -726,12 +696,13 @@ impl Builder { let child_kv = match self.mode.clone() { Mode::Online(_) => self.load_child_remote_and_maybe_save(&top_kv).await?, - Mode::OfflineOrElseOnline(offline_config, _) => + Mode::OfflineOrElseOnline(offline_config, _) => { if let Ok(kv) = self.load_child_snapshot(&offline_config.state_snapshot.path) { kv } else { self.load_child_remote_and_maybe_save(&top_kv).await? - }, + } + }, Mode::Offline(ref config) => self .load_child_snapshot(&config.state_snapshot.path) .map_err(|why| { @@ -749,7 +720,7 @@ impl Builder { } // Public methods -impl Builder { +impl Builder { /// Create a new builder. pub fn new() -> Self { Default::default() @@ -824,7 +795,13 @@ impl Builder { } self } +} +// Public methods +impl Builder +where + B::Header: DeserializeOwned, +{ /// Build the test externalities. pub async fn build(self) -> Result { let state_version = self.state_version; diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs deleted file mode 100644 index 3ea30a30221a2..0000000000000 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ /dev/null @@ -1,149 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: Apache-2.0 - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! WS RPC API for one off RPC calls to a substrate node. -// TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988 - -use jsonrpsee::{ - core::client::{Client, ClientT}, - rpc_params, - types::ParamsSer, - ws_client::{WsClient, WsClientBuilder}, -}; -use serde::de::DeserializeOwned; -use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; -use std::sync::Arc; - -enum RpcCall { - GetHeader, - GetFinalizedHead, - GetBlock, - GetRuntimeVersion, -} - -impl RpcCall { - fn as_str(&self) -> &'static str { - match self { - RpcCall::GetHeader => "chain_getHeader", - RpcCall::GetFinalizedHead => "chain_getFinalizedHead", - RpcCall::GetBlock => "chain_getBlock", - RpcCall::GetRuntimeVersion => "state_getRuntimeVersion", - } - } -} - -/// General purpose method for making RPC calls. -async fn make_request<'a, T: DeserializeOwned>( - client: &Arc, - call: RpcCall, - params: Option>, -) -> Result { - client - .request::(call.as_str(), params) - .await - .map_err(|e| format!("{} request failed: {:?}", call.as_str(), e)) -} - -enum ConnectionPolicy { - Reuse(Arc), - Reconnect, -} - -/// Simple RPC service that is capable of keeping the connection. -/// -/// Service will connect to `uri` for the first time already during initialization. -/// -/// Be careful with reusing the connection in a multithreaded environment. -pub struct RpcService { - uri: String, - policy: ConnectionPolicy, -} - -impl RpcService { - /// Creates a new RPC service. If `keep_connection`, then connects to `uri` right away. - pub async fn new>(uri: S, keep_connection: bool) -> Result { - let policy = if keep_connection { - ConnectionPolicy::Reuse(Arc::new(Self::build_client(uri.as_ref()).await?)) - } else { - ConnectionPolicy::Reconnect - }; - Ok(Self { uri: uri.as_ref().to_string(), policy }) - } - - /// Returns the address at which requests are sent. - pub fn uri(&self) -> String { - self.uri.clone() - } - - /// Build a websocket client that connects to `self.uri`. - async fn build_client>(uri: S) -> Result { - WsClientBuilder::default() - .max_request_body_size(u32::MAX) - .build(uri) - .await - .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) - } - - /// Generic method for making RPC requests. - async fn make_request<'a, T: DeserializeOwned>( - &self, - call: RpcCall, - params: Option>, - ) -> Result { - match self.policy { - // `self.keep_connection` must have been `true`. - ConnectionPolicy::Reuse(ref client) => make_request(client, call, params).await, - ConnectionPolicy::Reconnect => { - let client = Arc::new(Self::build_client(&self.uri).await?); - make_request(&client, call, params).await - }, - } - } - - /// Get the header of the block identified by `at`. - pub async fn get_header(&self, at: Block::Hash) -> Result - where - Block: BlockT, - Block::Header: DeserializeOwned, - { - self.make_request(RpcCall::GetHeader, rpc_params!(at)).await - } - - /// Get the finalized head. - pub async fn get_finalized_head(&self) -> Result { - self.make_request(RpcCall::GetFinalizedHead, None).await - } - - /// Get the signed block identified by `at`. - pub async fn get_block( - &self, - at: Block::Hash, - ) -> Result { - Ok(self - .make_request::>(RpcCall::GetBlock, rpc_params!(at)) - .await? - .block) - } - - /// Get the runtime version of a given chain. - pub async fn get_runtime_version( - &self, - at: Option, - ) -> Result { - self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await - } -} diff --git a/utils/frame/rpc/client/Cargo.toml b/utils/frame/rpc/client/Cargo.toml new file mode 100644 index 0000000000000..80aa60f199f1f --- /dev/null +++ b/utils/frame/rpc/client/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "substrate-rpc-client" +version = "0.10.0-dev" +authors = ["Parity Technologies "] +edition = "2021" +license = "Apache-2.0" +homepage = "https://substrate.io" +repository = "https://github.com/paritytech/substrate/" +description = "Shared JSON-RPC client" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +jsonrpsee = { version = "0.15.1", features = ["ws-client"] } +sc-rpc-api = { version = "0.10.0-dev", path = "../../../../client/rpc-api" } +async-trait = "0.1.57" +serde = "1" +sp-runtime = { version = "6.0.0", path = "../../../../primitives/runtime" } +log = "0.4" + +[dev-dependencies] +tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread", "sync"] } +sp-core = { path = "../../../../primitives/core" } \ No newline at end of file diff --git a/utils/frame/rpc/client/src/lib.rs b/utils/frame/rpc/client/src/lib.rs new file mode 100644 index 0000000000000..254cc193c0e67 --- /dev/null +++ b/utils/frame/rpc/client/src/lib.rs @@ -0,0 +1,265 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Shared JSON-RPC client related code and abstractions. +//! +//! It exposes a `WebSocket JSON-RPC` client that implements the RPC interface in [`sc-rpc-api`] +//! along with some abstractions. +//! +//! ## Usage +//! +//! ```no_run +//! # use substrate_rpc_client::{ws_client, StateApi}; +//! # use sp_core::{H256, storage::StorageKey}; +//! +//! #[tokio::main] +//! async fn main() { +//! +//! let client = ws_client("ws://127.0.0.1:9944").await.unwrap(); +//! client.storage(StorageKey(vec![]), Some(H256::zero())).await.unwrap(); +//! +//! // if all type params are not known you need to provide type params +//! StateApi::::storage(&client, StorageKey(vec![]), None).await.unwrap(); +//! } +//! ``` + +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::collections::VecDeque; + +pub use jsonrpsee::{ + core::client::{ClientT, Subscription, SubscriptionClientT}, + rpc_params, + ws_client::{WsClient, WsClientBuilder}, +}; +pub use sc_rpc_api::{ + author::AuthorApiClient as AuthorApi, chain::ChainApiClient as ChainApi, + child_state::ChildStateApiClient as ChildStateApi, dev::DevApiClient as DevApi, + offchain::OffchainApiClient as OffchainApi, state::StateApiClient as StateApi, + system::SystemApiClient as SystemApi, +}; + +/// Create a new `WebSocket` connection with shared settings. +pub async fn ws_client(uri: impl AsRef) -> Result { + WsClientBuilder::default() + .max_request_body_size(u32::MAX) + .request_timeout(std::time::Duration::from_secs(60 * 10)) + .connection_timeout(std::time::Duration::from_secs(60)) + .max_notifs_per_subscription(1024) + .build(uri) + .await + .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) +} + +/// Abstraction over RPC calling for headers. +#[async_trait] +pub trait HeaderProvider +where + Block::Header: HeaderT, +{ + /// Awaits for the header of the block with hash `hash`. + async fn get_header(&self, hash: Block::Hash) -> Block::Header; +} + +#[async_trait] +impl HeaderProvider for WsClient +where + Block::Header: DeserializeOwned, +{ + async fn get_header(&self, hash: Block::Hash) -> Block::Header { + ChainApi::<(), Block::Hash, Block::Header, ()>::header(self, Some(hash)) + .await + .unwrap() + .unwrap() + } +} + +/// Abstraction over RPC subscription for finalized headers. +#[async_trait] +pub trait HeaderSubscription +where + Block::Header: HeaderT, +{ + /// Await for the next finalized header from the subscription. + /// + /// Returns `None` if either the subscription has been closed or there was an error when reading + /// an object from the client. + async fn next_header(&mut self) -> Option; +} + +#[async_trait] +impl HeaderSubscription for Subscription +where + Block::Header: DeserializeOwned, +{ + async fn next_header(&mut self) -> Option { + match self.next().await { + Some(Ok(header)) => Some(header), + None => { + log::warn!("subscription closed"); + None + }, + Some(Err(why)) => { + log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why); + None + }, + } + } +} + +/// Stream of all finalized headers. +/// +/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of +/// them lack justification). +pub struct FinalizedHeaders< + 'a, + Block: BlockT, + HP: HeaderProvider, + HS: HeaderSubscription, +> { + header_provider: &'a HP, + subscription: HS, + fetched_headers: VecDeque, + last_returned: Option<::Hash>, +} + +impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> + FinalizedHeaders<'a, Block, HP, HS> +where + ::Header: DeserializeOwned, +{ + pub fn new(header_provider: &'a HP, subscription: HS) -> Self { + Self { + header_provider, + subscription, + fetched_headers: VecDeque::new(), + last_returned: None, + } + } + + /// Reads next finalized header from the subscription. If some headers (without justification) + /// have been skipped, fetches them as well. Returns number of headers that have been fetched. + /// + /// All fetched headers are stored in `self.fetched_headers`. + async fn fetch(&mut self) -> usize { + let last_finalized = match self.subscription.next_header().await { + Some(header) => header, + None => return 0, + }; + + self.fetched_headers.push_front(last_finalized.clone()); + + let mut last_finalized_parent = *last_finalized.parent_hash(); + let last_returned = self.last_returned.unwrap_or(last_finalized_parent); + + while last_finalized_parent != last_returned { + let parent_header = self.header_provider.get_header(last_finalized_parent).await; + self.fetched_headers.push_front(parent_header.clone()); + last_finalized_parent = *parent_header.parent_hash(); + } + + self.fetched_headers.len() + } + + /// Get the next finalized header. + pub async fn next(&mut self) -> Option { + if self.fetched_headers.is_empty() { + self.fetch().await; + } + + if let Some(header) = self.fetched_headers.pop_front() { + self.last_returned = Some(header.hash()); + Some(header) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header, H256}; + use std::sync::Arc; + use tokio::sync::Mutex; + + type Block = TBlock>; + type BlockNumber = u64; + type Hash = H256; + + struct MockHeaderProvider(pub Arc>>); + + fn headers() -> Vec
{ + let mut headers = vec![Header::new_from_number(0)]; + for n in 1..11 { + headers.push(Header { + parent_hash: headers.last().unwrap().hash(), + ..Header::new_from_number(n) + }) + } + headers + } + + #[async_trait] + impl HeaderProvider for MockHeaderProvider { + async fn get_header(&self, _hash: Hash) -> Header { + let height = self.0.lock().await.pop_front().unwrap(); + headers()[height as usize].clone() + } + } + + struct MockHeaderSubscription(pub VecDeque); + + #[async_trait] + impl HeaderSubscription for MockHeaderSubscription { + async fn next_header(&mut self) -> Option
{ + self.0.pop_front().map(|h| headers()[h as usize].clone()) + } + } + + #[tokio::test] + async fn finalized_headers_works_when_every_block_comes_from_subscription() { + let heights = vec![4, 5, 6, 7]; + + let provider = MockHeaderProvider(Default::default()); + let subscription = MockHeaderSubscription(heights.clone().into()); + let mut headers = FinalizedHeaders::new(&provider, subscription); + + for h in heights { + assert_eq!(h, headers.next().await.unwrap().number); + } + assert_eq!(None, headers.next().await); + } + + #[tokio::test] + async fn finalized_headers_come_from_subscription_and_provider_if_in_need() { + let all_heights = 3..11; + let heights_in_subscription = vec![3, 4, 6, 10]; + // Consecutive headers will be requested in the reversed order. + let heights_not_in_subscription = vec![5, 9, 8, 7]; + + let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into()))); + let subscription = MockHeaderSubscription(heights_in_subscription.into()); + let mut headers = FinalizedHeaders::new(&provider, subscription); + + for h in all_heights { + assert_eq!(h, headers.next().await.unwrap().number); + } + assert_eq!(None, headers.next().await); + } +} diff --git a/utils/frame/try-runtime/cli/Cargo.toml b/utils/frame/try-runtime/cli/Cargo.toml index 8a9d5a5f8979a..24d64d4aa4373 100644 --- a/utils/frame/try-runtime/cli/Cargo.toml +++ b/utils/frame/try-runtime/cli/Cargo.toml @@ -19,7 +19,6 @@ parity-scale-codec = "3.0.0" serde = "1.0.136" zstd = { version = "0.11.2", default-features = false } remote-externalities = { version = "0.10.0-dev", path = "../../remote-externalities" } -jsonrpsee = { version = "0.15.1", default-features = false, features = ["ws-client"] } sc-chain-spec = { version = "4.0.0-dev", path = "../../../../client/chain-spec" } sc-cli = { version = "0.10.0-dev", path = "../../../../client/cli" } sc-executor = { version = "0.10.0-dev", path = "../../../../client/executor" } @@ -33,6 +32,7 @@ sp-state-machine = { version = "0.12.0", path = "../../../../primitives/state-ma sp-version = { version = "5.0.0", path = "../../../../primitives/version" } sp-weights = { version = "4.0.0", path = "../../../../primitives/weights" } frame-try-runtime = { path = "../../../../frame/try-runtime" } +substrate-rpc-client = { path = "../../rpc/client" } [dev-dependencies] tokio = "1.17.0" diff --git a/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/utils/frame/try-runtime/cli/src/commands/execute_block.rs index df8c51b5a92d4..56d88b9cb8919 100644 --- a/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -20,11 +20,11 @@ use crate::{ state_machine_call_with_proof, SharedParams, State, LOG_TARGET, }; use parity_scale_codec::Encode; -use remote_externalities::rpc_api; use sc_service::{Configuration, NativeExecutionDispatch}; use sp_core::storage::well_known_keys; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::{fmt::Debug, str::FromStr}; +use substrate_rpc_client::{ws_client, ChainApi}; /// Configurations of the [`Command::ExecuteBlock`]. /// @@ -84,10 +84,11 @@ pub struct ExecuteBlockCmd { impl ExecuteBlockCmd { async fn block_at(&self, ws_uri: String) -> sc_cli::Result where - Block::Hash: FromStr, + Block::Hash: FromStr + serde::de::DeserializeOwned, ::Err: Debug, + Block::Header: serde::de::DeserializeOwned, { - let rpc_service = rpc_api::RpcService::new(ws_uri, false).await?; + let rpc = ws_client(&ws_uri).await?; match (&self.block_at, &self.state) { (Some(block_at), State::Snap { .. }) => hash_of::(block_at), @@ -100,7 +101,9 @@ impl ExecuteBlockCmd { target: LOG_TARGET, "No --block-at or --at provided, using the latest finalized block instead" ); - rpc_service.get_finalized_head::().await.map_err(Into::into) + ChainApi::<(), Block::Hash, Block::Header, ()>::finalized_head(&rpc) + .await + .map_err(|e| e.to_string().into()) }, (None, State::Live { at: Some(at), .. }) => hash_of::(at), _ => { @@ -137,6 +140,8 @@ where Block: BlockT + serde::de::DeserializeOwned, Block::Hash: FromStr, ::Err: Debug, + Block::Hash: serde::de::DeserializeOwned, + Block::Header: serde::de::DeserializeOwned, NumberFor: FromStr, as FromStr>::Err: Debug, ExecDispatch: NativeExecutionDispatch + 'static, @@ -146,8 +151,11 @@ where let block_ws_uri = command.block_ws_uri::(); let block_at = command.block_at::(block_ws_uri.clone()).await?; - let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await?; - let block: Block = rpc_service.get_block::(block_at).await?; + let rpc = ws_client(&block_ws_uri).await?; + let block: Block = ChainApi::<(), Block::Hash, Block::Header, _>::block(&rpc, Some(block_at)) + .await + .unwrap() + .unwrap(); let parent_hash = block.header().parent_hash(); log::info!( target: LOG_TARGET, diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index df6d40b2b8ab1..1cc371c8f22fd 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -19,22 +19,15 @@ use crate::{ build_executor, ensure_matching_spec, extract_code, full_extensions, local_spec, parse, state_machine_call_with_proof, SharedParams, LOG_TARGET, }; -use jsonrpsee::{ - core::{ - async_trait, - client::{Client, Subscription, SubscriptionClientT}, - }, - ws_client::WsClientBuilder, -}; use parity_scale_codec::{Decode, Encode}; -use remote_externalities::{rpc_api::RpcService, Builder, Mode, OnlineConfig}; +use remote_externalities::{Builder, Mode, OnlineConfig}; use sc_executor::NativeExecutionDispatch; use sc_service::Configuration; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use sp_weights::Weight; -use std::{collections::VecDeque, fmt::Debug, str::FromStr}; +use std::{fmt::Debug, str::FromStr}; +use substrate_rpc_client::{ws_client, ChainApi, FinalizedHeaders, Subscription, WsClient}; const SUB: &str = "chain_subscribeFinalizedHeads"; const UN_SUB: &str = "chain_unsubscribeFinalizedHeads"; @@ -71,144 +64,19 @@ pub struct FollowChainCmd { /// /// Returns a pair `(client, subscription)` - `subscription` alone will be useless, because it /// relies on the related alive `client`. -async fn start_subscribing( +async fn start_subscribing( url: &str, -) -> sc_cli::Result<(Client, Subscription
)> { - let client = WsClientBuilder::default() - .connection_timeout(std::time::Duration::new(20, 0)) - .max_notifs_per_subscription(1024) - .max_request_body_size(u32::MAX) - .build(url) - .await - .map_err(|e| sc_cli::Error::Application(e.into()))?; +) -> sc_cli::Result<(WsClient, Subscription
)> { + let client = ws_client(url).await.map_err(|e| sc_cli::Error::Application(e.into()))?; log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB); - let sub = client - .subscribe(SUB, None, UN_SUB) + let sub = ChainApi::<(), (), Header, ()>::subscribe_finalized_heads(&client) .await .map_err(|e| sc_cli::Error::Application(e.into()))?; Ok((client, sub)) } -/// Abstraction over RPC calling for headers. -#[async_trait] -trait HeaderProvider -where - Block::Header: HeaderT, -{ - /// Awaits for the header of the block with hash `hash`. - async fn get_header(&self, hash: Block::Hash) -> Block::Header; -} - -#[async_trait] -impl HeaderProvider for RpcService -where - Block::Header: DeserializeOwned, -{ - async fn get_header(&self, hash: Block::Hash) -> Block::Header { - self.get_header::(hash).await.unwrap() - } -} - -/// Abstraction over RPC subscription for finalized headers. -#[async_trait] -trait HeaderSubscription -where - Block::Header: HeaderT, -{ - /// Await for the next finalized header from the subscription. - /// - /// Returns `None` if either the subscription has been closed or there was an error when reading - /// an object from the client. - async fn next_header(&mut self) -> Option; -} - -#[async_trait] -impl HeaderSubscription for Subscription -where - Block::Header: DeserializeOwned, -{ - async fn next_header(&mut self) -> Option { - match self.next().await { - Some(Ok(header)) => Some(header), - None => { - log::warn!("subscription closed"); - None - }, - Some(Err(why)) => { - log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why); - None - }, - } - } -} - -/// Stream of all finalized headers. -/// -/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of -/// them lack justification). -struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> -{ - header_provider: &'a HP, - subscription: HS, - fetched_headers: VecDeque, - last_returned: Option<::Hash>, -} - -impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> - FinalizedHeaders<'a, Block, HP, HS> -where - ::Header: DeserializeOwned, -{ - pub fn new(header_provider: &'a HP, subscription: HS) -> Self { - Self { - header_provider, - subscription, - fetched_headers: VecDeque::new(), - last_returned: None, - } - } - - /// Reads next finalized header from the subscription. If some headers (without justification) - /// have been skipped, fetches them as well. Returns number of headers that have been fetched. - /// - /// All fetched headers are stored in `self.fetched_headers`. - async fn fetch(&mut self) -> usize { - let last_finalized = match self.subscription.next_header().await { - Some(header) => header, - None => return 0, - }; - - self.fetched_headers.push_front(last_finalized.clone()); - - let mut last_finalized_parent = *last_finalized.parent_hash(); - let last_returned = self.last_returned.unwrap_or(last_finalized_parent); - - while last_finalized_parent != last_returned { - let parent_header = self.header_provider.get_header(last_finalized_parent).await; - self.fetched_headers.push_front(parent_header.clone()); - last_finalized_parent = *parent_header.parent_hash(); - } - - self.fetched_headers.len() - } - - /// Get the next finalized header. - pub async fn next(&mut self) -> Option { - if self.fetched_headers.is_empty() { - self.fetch().await; - } - - if let Some(header) = self.fetched_headers.pop_front() { - self.last_returned = Some(header.hash()); - Some(header) - } else { - None - } - } -} - pub(crate) async fn follow_chain( shared: SharedParams, command: FollowChainCmd, @@ -224,22 +92,23 @@ where ExecDispatch: NativeExecutionDispatch + 'static, { let mut maybe_state_ext = None; - let (_client, subscription) = start_subscribing::(&command.uri).await?; + let (rpc, subscription) = start_subscribing::(&command.uri).await?; let (code_key, code) = extract_code(&config.chain_spec)?; let executor = build_executor::(&shared, &config); let execution = shared.execution; - let rpc_service = RpcService::new(&command.uri, command.keep_connection).await?; - - let mut finalized_headers: FinalizedHeaders> = - FinalizedHeaders::new(&rpc_service, subscription); + let mut finalized_headers: FinalizedHeaders = + FinalizedHeaders::new(&rpc, subscription); while let Some(header) = finalized_headers.next().await { let hash = header.hash(); let number = header.number(); - let block = rpc_service.get_block::(hash).await.unwrap(); + let block: Block = ChainApi::<(), Block::Hash, Block::Header, _>::block(&rpc, Some(hash)) + .await + .unwrap() + .unwrap(); log::debug!( target: LOG_TARGET, @@ -295,7 +164,7 @@ where full_extensions(), )?; - let consumed_weight = ::decode(&mut &*encoded_result) + let consumed_weight = ::decode(&mut &*encoded_result) .map_err(|e| format!("failed to decode weight: {:?}", e))?; let storage_changes = changes @@ -326,76 +195,3 @@ where log::error!(target: LOG_TARGET, "ws subscription must have terminated."); Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header}; - use std::sync::Arc; - use tokio::sync::Mutex; - - type Block = TBlock>; - type BlockNumber = u64; - type Hash = H256; - - struct MockHeaderProvider(pub Arc>>); - - fn headers() -> Vec
{ - let mut headers = vec![Header::new_from_number(0)]; - for n in 1..11 { - headers.push(Header { - parent_hash: headers.last().unwrap().hash(), - ..Header::new_from_number(n) - }) - } - headers - } - - #[async_trait] - impl HeaderProvider for MockHeaderProvider { - async fn get_header(&self, _hash: Hash) -> Header { - let height = self.0.lock().await.pop_front().unwrap(); - headers()[height as usize].clone() - } - } - - struct MockHeaderSubscription(pub VecDeque); - - #[async_trait] - impl HeaderSubscription for MockHeaderSubscription { - async fn next_header(&mut self) -> Option
{ - self.0.pop_front().map(|h| headers()[h as usize].clone()) - } - } - - #[tokio::test] - async fn finalized_headers_works_when_every_block_comes_from_subscription() { - let heights = vec![4, 5, 6, 7]; - - let provider = MockHeaderProvider(Default::default()); - let subscription = MockHeaderSubscription(heights.clone().into()); - let mut headers = FinalizedHeaders::new(&provider, subscription); - - for h in heights { - assert_eq!(h, headers.next().await.unwrap().number); - } - assert_eq!(None, headers.next().await); - } - - #[tokio::test] - async fn finalized_headers_come_from_subscription_and_provider_if_in_need() { - let all_heights = 3..11; - let heights_in_subscription = vec![3, 4, 6, 10]; - // Consecutive headers will be requested in the reversed order. - let heights_not_in_subscription = vec![5, 9, 8, 7]; - - let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into()))); - let subscription = MockHeaderSubscription(heights_in_subscription.into()); - let mut headers = FinalizedHeaders::new(&provider, subscription); - - for h in all_heights { - assert_eq!(h, headers.next().await.unwrap().number); - } - assert_eq!(None, headers.next().await); - } -} diff --git a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs index 6f48d1f8f68a7..8d2585372b4a8 100644 --- a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs +++ b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs @@ -20,12 +20,12 @@ use crate::{ parse, state_machine_call, SharedParams, State, LOG_TARGET, }; use parity_scale_codec::Encode; -use remote_externalities::rpc_api; use sc_executor::NativeExecutionDispatch; use sc_service::Configuration; use sp_core::storage::well_known_keys; use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; use std::{fmt::Debug, str::FromStr}; +use substrate_rpc_client::{ws_client, ChainApi}; /// Configurations of the [`Command::OffchainWorker`]. #[derive(Debug, Clone, clap::Parser)] @@ -117,8 +117,11 @@ where let header_at = command.header_at::()?; let header_ws_uri = command.header_ws_uri::(); - let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false).await?; - let header = rpc_service.get_header::(header_at).await?; + let rpc = ws_client(&header_ws_uri).await?; + let header = ChainApi::<(), Block::Hash, Block::Header, ()>::header(&rpc, Some(header_at)) + .await + .unwrap() + .unwrap(); log::info!( target: LOG_TARGET, "fetched header from {:?}, block number: {:?}", diff --git a/utils/frame/try-runtime/cli/src/commands/on_runtime_upgrade.rs b/utils/frame/try-runtime/cli/src/commands/on_runtime_upgrade.rs index 5e11123526532..fba34ddfb5060 100644 --- a/utils/frame/try-runtime/cli/src/commands/on_runtime_upgrade.rs +++ b/utils/frame/try-runtime/cli/src/commands/on_runtime_upgrade.rs @@ -45,6 +45,7 @@ where Block: BlockT + serde::de::DeserializeOwned, Block::Hash: FromStr, ::Err: Debug, + Block::Header: serde::de::DeserializeOwned, NumberFor: FromStr, as FromStr>::Err: Debug, ExecDispatch: NativeExecutionDispatch + 'static, diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 9034f356a9f67..0732c8618fc15 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -267,8 +267,7 @@ use parity_scale_codec::Decode; use remote_externalities::{ - rpc_api::RpcService, Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, - TestExternalities, + Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, TestExternalities, }; use sc_chain_spec::ChainSpec; use sc_cli::{ @@ -297,6 +296,7 @@ use sp_runtime::{ use sp_state_machine::{OverlayedChanges, StateMachine, TrieBackendBuilder}; use sp_version::StateVersion; use std::{fmt::Debug, path::PathBuf, str::FromStr}; +use substrate_rpc_client::{ws_client, StateApi}; mod commands; pub(crate) mod parse; @@ -633,9 +633,8 @@ pub(crate) async fn ensure_matching_spec( expected_spec_version: u32, relaxed: bool, ) { - let rpc_service = RpcService::new(uri.clone(), false).await.unwrap(); - match rpc_service - .get_runtime_version::(None) + let rpc = ws_client(&uri).await.unwrap(); + match StateApi::::runtime_version(&rpc, None) .await .map(|version| (String::from(version.spec_name.clone()), version.spec_version)) .map(|(spec_name, spec_version)| (spec_name.to_lowercase(), spec_version))