diff --git a/bin/node/cli/benches/transaction_pool.rs b/bin/node/cli/benches/transaction_pool.rs new file mode 100644 index 0000000000000..4f5ccd6ea912f --- /dev/null +++ b/bin/node/cli/benches/transaction_pool.rs @@ -0,0 +1,274 @@ +// This file is part of Substrate. + +// Copyright (C) 2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput}; +use futures::{future, StreamExt}; +use node_cli::service::{create_extrinsic, fetch_nonce, FullClient, TransactionPool}; +use node_primitives::AccountId; +use node_runtime::{constants::currency::*, BalancesCall, SudoCall}; +use sc_client_api::execution_extensions::ExecutionStrategies; +use sc_service::{ + config::{ + DatabaseSource, KeepBlocks, KeystoreConfig, NetworkConfiguration, OffchainWorkerConfig, + PruningMode, TransactionPoolOptions, TransactionStorageMode, WasmExecutionMethod, + }, + BasePath, Configuration, Role, +}; +use sc_transaction_pool::PoolLimit; +use sc_transaction_pool_api::{TransactionPool as _, TransactionSource, TransactionStatus}; +use sp_core::{crypto::Pair, sr25519}; +use sp_keyring::Sr25519Keyring; +use sp_runtime::{generic::BlockId, OpaqueExtrinsic}; +use tokio::runtime::Handle; + +fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase { + let base_path = BasePath::new_temp_dir().expect("Creates base path"); + let root = base_path.path().to_path_buf(); + + let network_config = NetworkConfiguration::new( + Sr25519Keyring::Alice.to_seed(), + "network/test/0.1", + Default::default(), + None, + ); + + let spec = Box::new(node_cli::chain_spec::development_config()); + + let config = Configuration { + impl_name: "BenchmarkImpl".into(), + impl_version: "1.0".into(), + role: Role::Authority, + tokio_handle, + transaction_pool: TransactionPoolOptions { + ready: PoolLimit { count: 100_000, total_bytes: 100 * 1024 * 1024 }, + future: PoolLimit { count: 100_000, total_bytes: 100 * 1024 * 1024 }, + reject_future_transactions: false, + }, + network: network_config, + keystore: KeystoreConfig::InMemory, + keystore_remote: Default::default(), + database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 }, + state_cache_size: 67108864, + state_cache_child_ratio: None, + state_pruning: PruningMode::ArchiveAll, + keep_blocks: KeepBlocks::All, + transaction_storage: TransactionStorageMode::BlockBody, + chain_spec: spec, + wasm_method: WasmExecutionMethod::Interpreted, + // NOTE: we enforce the use of the native runtime to make the errors more debuggable + execution_strategies: ExecutionStrategies { + syncing: sc_client_api::ExecutionStrategy::NativeWhenPossible, + importing: sc_client_api::ExecutionStrategy::NativeWhenPossible, + block_construction: sc_client_api::ExecutionStrategy::NativeWhenPossible, + offchain_worker: sc_client_api::ExecutionStrategy::NativeWhenPossible, + other: sc_client_api::ExecutionStrategy::NativeWhenPossible, + }, + rpc_http: None, + rpc_ws: None, + rpc_ipc: None, + rpc_ws_max_connections: None, + rpc_cors: None, + rpc_methods: Default::default(), + rpc_max_payload: None, + ws_max_out_buffer_capacity: None, + prometheus_config: None, + telemetry_endpoints: None, + default_heap_pages: None, + offchain_worker: OffchainWorkerConfig { enabled: true, indexing_enabled: false }, + force_authoring: false, + disable_grandpa: false, + dev_key_seed: Some(Sr25519Keyring::Alice.to_seed()), + tracing_targets: None, + tracing_receiver: Default::default(), + max_runtime_instances: 8, + announce_block: true, + base_path: Some(base_path), + informant_output_format: Default::default(), + wasm_runtime_overrides: None, + }; + + node_cli::service::new_full_base(config, |_, _| ()).expect("Creates node") +} + +fn create_accounts(num: usize) -> Vec { + (0..num) + .map(|i| { + Pair::from_string(&format!("{}/{}", Sr25519Keyring::Alice.to_seed(), i), None) + .expect("Creates account pair") + }) + .collect() +} + +/// Create the extrinsics that will initialize the accounts from the sudo account (Alice). +/// +/// `start_nonce` is the current nonce of Alice. +fn create_account_extrinsics( + client: &FullClient, + accounts: &[sr25519::Pair], +) -> Vec { + let start_nonce = fetch_nonce(client, Sr25519Keyring::Alice.pair()); + + accounts + .iter() + .enumerate() + .map(|(i, a)| { + vec![ + // Reset the nonce by removing any funds + create_extrinsic( + client, + Sr25519Keyring::Alice.pair(), + SudoCall::sudo { + call: Box::new( + BalancesCall::set_balance { + who: AccountId::from(a.public()).into(), + new_free: 0, + new_reserved: 0, + } + .into(), + ), + }, + Some(start_nonce + (i as u32) * 2), + ), + // Give back funds + create_extrinsic( + client, + Sr25519Keyring::Alice.pair(), + SudoCall::sudo { + call: Box::new( + BalancesCall::set_balance { + who: AccountId::from(a.public()).into(), + new_free: 1_000_000 * DOLLARS, + new_reserved: 0, + } + .into(), + ), + }, + Some(start_nonce + (i as u32) * 2 + 1), + ), + ] + }) + .flatten() + .map(OpaqueExtrinsic::from) + .collect() +} + +fn create_benchmark_extrinsics( + client: &FullClient, + accounts: &[sr25519::Pair], + extrinsics_per_account: usize, +) -> Vec { + accounts + .iter() + .map(|account| { + (0..extrinsics_per_account).map(move |nonce| { + create_extrinsic( + client, + account.clone(), + BalancesCall::transfer { + dest: Sr25519Keyring::Bob.to_account_id().into(), + value: 1 * DOLLARS, + }, + Some(nonce as u32), + ) + }) + }) + .flatten() + .map(OpaqueExtrinsic::from) + .collect() +} + +async fn submit_tx_and_wait_for_inclusion( + tx_pool: &TransactionPool, + tx: OpaqueExtrinsic, + client: &FullClient, + wait_for_finalized: bool, +) { + let best_hash = client.chain_info().best_hash; + + let mut watch = tx_pool + .submit_and_watch(&BlockId::Hash(best_hash), TransactionSource::External, tx.clone()) + .await + .expect("Submits tx to pool") + .fuse(); + + loop { + match watch.select_next_some().await { + TransactionStatus::Finalized(_) => break, + TransactionStatus::InBlock(_) if !wait_for_finalized => break, + _ => {}, + } + } +} + +fn transaction_pool_benchmarks(c: &mut Criterion) { + sp_tracing::try_init_simple(); + + let runtime = tokio::runtime::Runtime::new().expect("Creates tokio runtime"); + let tokio_handle = runtime.handle().clone(); + + let node = new_node(tokio_handle.clone()); + + let account_num = 10; + let extrinsics_per_account = 2000; + let accounts = create_accounts(account_num); + + let mut group = c.benchmark_group("Transaction pool"); + + group.sample_size(10); + group.throughput(Throughput::Elements(account_num as u64 * extrinsics_per_account as u64)); + + let mut counter = 1; + group.bench_function( + format!("{} transfers from {} accounts", account_num * extrinsics_per_account, account_num), + move |b| { + b.iter_batched( + || { + let prepare_extrinsics = create_account_extrinsics(&*node.client, &accounts); + + runtime.block_on(future::join_all(prepare_extrinsics.into_iter().map(|tx| { + submit_tx_and_wait_for_inclusion( + &node.transaction_pool, + tx, + &*node.client, + true, + ) + }))); + + create_benchmark_extrinsics(&*node.client, &accounts, extrinsics_per_account) + }, + |extrinsics| { + runtime.block_on(future::join_all(extrinsics.into_iter().map(|tx| { + submit_tx_and_wait_for_inclusion( + &node.transaction_pool, + tx, + &*node.client, + false, + ) + }))); + + println!("Finished {}", counter); + counter += 1; + }, + BatchSize::SmallInput, + ) + }, + ); +} + +criterion_group!(benches, transaction_pool_benchmarks); +criterion_main!(benches); diff --git a/client/cli/src/commands/run_cmd.rs b/client/cli/src/commands/run_cmd.rs index 98f2090c6f446..eea6c10f66435 100644 --- a/client/cli/src/commands/run_cmd.rs +++ b/client/cli/src/commands/run_cmd.rs @@ -127,6 +127,10 @@ pub struct RunCmd { #[structopt(long = "ws-max-connections", value_name = "COUNT")] pub ws_max_connections: Option, + /// Set the the maximum WebSocket output buffer size in MiB. Default is 16. + #[structopt(long = "ws-max-out-buffer-capacity")] + pub ws_max_out_buffer_capacity: Option, + /// Specify browser Origins allowed to access the HTTP & WS RPC servers. /// /// A comma-separated list of origins (protocol://domain or special `null` @@ -432,6 +436,10 @@ impl CliConfiguration for RunCmd { Ok(self.rpc_max_payload) } + fn ws_max_out_buffer_capacity(&self) -> Result> { + Ok(self.ws_max_out_buffer_capacity) + } + fn transaction_pool(&self) -> Result { Ok(self.pool_config.transaction_pool()) } diff --git a/client/cli/src/config.rs b/client/cli/src/config.rs index 59fc6bd438a1c..c1816e16ae6c1 100644 --- a/client/cli/src/config.rs +++ b/client/cli/src/config.rs @@ -360,6 +360,11 @@ pub trait CliConfiguration: Sized { Ok(None) } + /// Get maximum WS output buffer capacity. + fn ws_max_out_buffer_capacity(&self) -> Result> { + Ok(None) + } + /// Get the prometheus configuration (`None` if disabled) /// /// By default this is `None`. @@ -513,6 +518,7 @@ pub trait CliConfiguration: Sized { rpc_ws_max_connections: self.rpc_ws_max_connections()?, rpc_cors: self.rpc_cors(is_dev)?, rpc_max_payload: self.rpc_max_payload()?, + ws_max_out_buffer_capacity: self.ws_max_out_buffer_capacity()?, prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?, telemetry_endpoints, default_heap_pages: self.default_heap_pages()?, diff --git a/client/rpc-servers/src/lib.rs b/client/rpc-servers/src/lib.rs index 65ed6a914b19a..1ac409d6ba89f 100644 --- a/client/rpc-servers/src/lib.rs +++ b/client/rpc-servers/src/lib.rs @@ -33,6 +33,9 @@ const MEGABYTE: usize = 1024 * 1024; /// Maximal payload accepted by RPC servers. pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE; +/// Maximal buffer size in WS server. +pub const WS_MAX_BUFFER_CAPACITY_DEFAULT: usize = 16 * MEGABYTE; + /// Default maximum number of connections for WS RPC servers. const WS_MAX_CONNECTIONS: usize = 100; @@ -172,18 +175,32 @@ pub fn start_ws< cors: Option<&Vec>, io: RpcHandler, maybe_max_payload_mb: Option, + maybe_max_out_buffer_capacity_mb: Option, server_metrics: ServerMetrics, tokio_handle: tokio::runtime::Handle, ) -> io::Result { - let rpc_max_payload = maybe_max_payload_mb + let max_payload = maybe_max_payload_mb .map(|mb| mb.saturating_mul(MEGABYTE)) .unwrap_or(RPC_MAX_PAYLOAD_DEFAULT); + let max_out_buffer_capacity = maybe_max_out_buffer_capacity_mb + .map(|mb| mb.saturating_mul(MEGABYTE)) + .unwrap_or(WS_MAX_BUFFER_CAPACITY_DEFAULT); + + if max_payload > max_out_buffer_capacity { + log::warn!( + "maximum payload ({}) is more than maximum output buffer ({}) size in ws server, the payload will actually be limited by the buffer size", + max_payload, + max_out_buffer_capacity, + ) + } + ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| { context.sender().into() }) .event_loop_executor(tokio_handle) - .max_payload(rpc_max_payload) + .max_payload(max_payload) .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) + .max_out_buffer_capacity(max_out_buffer_capacity) .allowed_origins(map_cors(cors)) .allowed_hosts(hosts_filtering(cors.is_some())) .session_stats(server_metrics) diff --git a/client/service/src/config.rs b/client/service/src/config.rs index a98a34b473cee..967d36feb58a8 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -97,6 +97,8 @@ pub struct Configuration { pub rpc_methods: RpcMethods, /// Maximum payload of rpc request/responses. pub rpc_max_payload: Option, + /// Maximum size of the output buffer capacity for websocket connections. + pub ws_max_out_buffer_capacity: Option, /// Prometheus endpoint configuration. `None` if disabled. pub prometheus_config: Option, /// Telemetry service URL. `None` if disabled. diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 7284747424aa9..8d8c54cc25f29 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -424,6 +424,7 @@ fn start_rpc_servers< ), )?, config.rpc_max_payload, + config.ws_max_out_buffer_capacity, server_metrics.clone(), config.tokio_handle.clone(), ) diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 8000c536cdf93..6a12c57f75afa 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -261,6 +261,7 @@ fn node_config< rpc_cors: None, rpc_methods: Default::default(), rpc_max_payload: None, + ws_max_out_buffer_capacity: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/test-utils/test-runner/src/utils.rs b/test-utils/test-runner/src/utils.rs index 8e8c84e6b4f8a..4937493d7750f 100644 --- a/test-utils/test-runner/src/utils.rs +++ b/test-utils/test-runner/src/utils.rs @@ -94,6 +94,7 @@ pub fn default_config(tokio_handle: Handle, mut chain_spec: Box) rpc_cors: None, rpc_methods: Default::default(), rpc_max_payload: None, + ws_max_out_buffer_capacity: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None,