Skip to content

Commit

Permalink
make the ws buffer size configurable (paritytech#10013)
Browse files Browse the repository at this point in the history
* make the ws buffer size configurable

* Update client/cli/src/commands/run_cmd.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update client/cli/src/commands/run_cmd.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update client/cli/src/commands/run_cmd.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Final touches

* Apply suggestions from code review

* fix bench

* remove in buffer

* Apply suggestions from code review

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
3 people authored and NikVolf committed Oct 22, 2021
1 parent bb32803 commit a4b7b67
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 2 deletions.
274 changes: 274 additions & 0 deletions bin/node/cli/benches/transaction_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// This file is part of Substrate.

// Copyright (C) 2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use futures::{future, StreamExt};
use node_cli::service::{create_extrinsic, fetch_nonce, FullClient, TransactionPool};
use node_primitives::AccountId;
use node_runtime::{constants::currency::*, BalancesCall, SudoCall};
use sc_client_api::execution_extensions::ExecutionStrategies;
use sc_service::{
config::{
DatabaseSource, KeepBlocks, KeystoreConfig, NetworkConfiguration, OffchainWorkerConfig,
PruningMode, TransactionPoolOptions, TransactionStorageMode, WasmExecutionMethod,
},
BasePath, Configuration, Role,
};
use sc_transaction_pool::PoolLimit;
use sc_transaction_pool_api::{TransactionPool as _, TransactionSource, TransactionStatus};
use sp_core::{crypto::Pair, sr25519};
use sp_keyring::Sr25519Keyring;
use sp_runtime::{generic::BlockId, OpaqueExtrinsic};
use tokio::runtime::Handle;

fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase {
let base_path = BasePath::new_temp_dir().expect("Creates base path");
let root = base_path.path().to_path_buf();

let network_config = NetworkConfiguration::new(
Sr25519Keyring::Alice.to_seed(),
"network/test/0.1",
Default::default(),
None,
);

let spec = Box::new(node_cli::chain_spec::development_config());

let config = Configuration {
impl_name: "BenchmarkImpl".into(),
impl_version: "1.0".into(),
role: Role::Authority,
tokio_handle,
transaction_pool: TransactionPoolOptions {
ready: PoolLimit { count: 100_000, total_bytes: 100 * 1024 * 1024 },
future: PoolLimit { count: 100_000, total_bytes: 100 * 1024 * 1024 },
reject_future_transactions: false,
},
network: network_config,
keystore: KeystoreConfig::InMemory,
keystore_remote: Default::default(),
database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
state_cache_size: 67108864,
state_cache_child_ratio: None,
state_pruning: PruningMode::ArchiveAll,
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
chain_spec: spec,
wasm_method: WasmExecutionMethod::Interpreted,
// NOTE: we enforce the use of the native runtime to make the errors more debuggable
execution_strategies: ExecutionStrategies {
syncing: sc_client_api::ExecutionStrategy::NativeWhenPossible,
importing: sc_client_api::ExecutionStrategy::NativeWhenPossible,
block_construction: sc_client_api::ExecutionStrategy::NativeWhenPossible,
offchain_worker: sc_client_api::ExecutionStrategy::NativeWhenPossible,
other: sc_client_api::ExecutionStrategy::NativeWhenPossible,
},
rpc_http: None,
rpc_ws: None,
rpc_ipc: None,
rpc_ws_max_connections: None,
rpc_cors: None,
rpc_methods: Default::default(),
rpc_max_payload: None,
ws_max_out_buffer_capacity: None,
prometheus_config: None,
telemetry_endpoints: None,
default_heap_pages: None,
offchain_worker: OffchainWorkerConfig { enabled: true, indexing_enabled: false },
force_authoring: false,
disable_grandpa: false,
dev_key_seed: Some(Sr25519Keyring::Alice.to_seed()),
tracing_targets: None,
tracing_receiver: Default::default(),
max_runtime_instances: 8,
announce_block: true,
base_path: Some(base_path),
informant_output_format: Default::default(),
wasm_runtime_overrides: None,
};

node_cli::service::new_full_base(config, |_, _| ()).expect("Creates node")
}

fn create_accounts(num: usize) -> Vec<sr25519::Pair> {
(0..num)
.map(|i| {
Pair::from_string(&format!("{}/{}", Sr25519Keyring::Alice.to_seed(), i), None)
.expect("Creates account pair")
})
.collect()
}

/// Create the extrinsics that will initialize the accounts from the sudo account (Alice).
///
/// `start_nonce` is the current nonce of Alice.
fn create_account_extrinsics(
client: &FullClient,
accounts: &[sr25519::Pair],
) -> Vec<OpaqueExtrinsic> {
let start_nonce = fetch_nonce(client, Sr25519Keyring::Alice.pair());

accounts
.iter()
.enumerate()
.map(|(i, a)| {
vec![
// Reset the nonce by removing any funds
create_extrinsic(
client,
Sr25519Keyring::Alice.pair(),
SudoCall::sudo {
call: Box::new(
BalancesCall::set_balance {
who: AccountId::from(a.public()).into(),
new_free: 0,
new_reserved: 0,
}
.into(),
),
},
Some(start_nonce + (i as u32) * 2),
),
// Give back funds
create_extrinsic(
client,
Sr25519Keyring::Alice.pair(),
SudoCall::sudo {
call: Box::new(
BalancesCall::set_balance {
who: AccountId::from(a.public()).into(),
new_free: 1_000_000 * DOLLARS,
new_reserved: 0,
}
.into(),
),
},
Some(start_nonce + (i as u32) * 2 + 1),
),
]
})
.flatten()
.map(OpaqueExtrinsic::from)
.collect()
}

fn create_benchmark_extrinsics(
client: &FullClient,
accounts: &[sr25519::Pair],
extrinsics_per_account: usize,
) -> Vec<OpaqueExtrinsic> {
accounts
.iter()
.map(|account| {
(0..extrinsics_per_account).map(move |nonce| {
create_extrinsic(
client,
account.clone(),
BalancesCall::transfer {
dest: Sr25519Keyring::Bob.to_account_id().into(),
value: 1 * DOLLARS,
},
Some(nonce as u32),
)
})
})
.flatten()
.map(OpaqueExtrinsic::from)
.collect()
}

async fn submit_tx_and_wait_for_inclusion(
tx_pool: &TransactionPool,
tx: OpaqueExtrinsic,
client: &FullClient,
wait_for_finalized: bool,
) {
let best_hash = client.chain_info().best_hash;

let mut watch = tx_pool
.submit_and_watch(&BlockId::Hash(best_hash), TransactionSource::External, tx.clone())
.await
.expect("Submits tx to pool")
.fuse();

loop {
match watch.select_next_some().await {
TransactionStatus::Finalized(_) => break,
TransactionStatus::InBlock(_) if !wait_for_finalized => break,
_ => {},
}
}
}

fn transaction_pool_benchmarks(c: &mut Criterion) {
sp_tracing::try_init_simple();

let runtime = tokio::runtime::Runtime::new().expect("Creates tokio runtime");
let tokio_handle = runtime.handle().clone();

let node = new_node(tokio_handle.clone());

let account_num = 10;
let extrinsics_per_account = 2000;
let accounts = create_accounts(account_num);

let mut group = c.benchmark_group("Transaction pool");

group.sample_size(10);
group.throughput(Throughput::Elements(account_num as u64 * extrinsics_per_account as u64));

let mut counter = 1;
group.bench_function(
format!("{} transfers from {} accounts", account_num * extrinsics_per_account, account_num),
move |b| {
b.iter_batched(
|| {
let prepare_extrinsics = create_account_extrinsics(&*node.client, &accounts);

runtime.block_on(future::join_all(prepare_extrinsics.into_iter().map(|tx| {
submit_tx_and_wait_for_inclusion(
&node.transaction_pool,
tx,
&*node.client,
true,
)
})));

create_benchmark_extrinsics(&*node.client, &accounts, extrinsics_per_account)
},
|extrinsics| {
runtime.block_on(future::join_all(extrinsics.into_iter().map(|tx| {
submit_tx_and_wait_for_inclusion(
&node.transaction_pool,
tx,
&*node.client,
false,
)
})));

println!("Finished {}", counter);
counter += 1;
},
BatchSize::SmallInput,
)
},
);
}

criterion_group!(benches, transaction_pool_benchmarks);
criterion_main!(benches);
8 changes: 8 additions & 0 deletions client/cli/src/commands/run_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ pub struct RunCmd {
#[structopt(long = "ws-max-connections", value_name = "COUNT")]
pub ws_max_connections: Option<usize>,

/// Set the the maximum WebSocket output buffer size in MiB. Default is 16.
#[structopt(long = "ws-max-out-buffer-capacity")]
pub ws_max_out_buffer_capacity: Option<usize>,

/// Specify browser Origins allowed to access the HTTP & WS RPC servers.
///
/// A comma-separated list of origins (protocol://domain or special `null`
Expand Down Expand Up @@ -432,6 +436,10 @@ impl CliConfiguration for RunCmd {
Ok(self.rpc_max_payload)
}

fn ws_max_out_buffer_capacity(&self) -> Result<Option<usize>> {
Ok(self.ws_max_out_buffer_capacity)
}

fn transaction_pool(&self) -> Result<TransactionPoolOptions> {
Ok(self.pool_config.transaction_pool())
}
Expand Down
6 changes: 6 additions & 0 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
Ok(None)
}

/// Get maximum WS output buffer capacity.
fn ws_max_out_buffer_capacity(&self) -> Result<Option<usize>> {
Ok(None)
}

/// Get the prometheus configuration (`None` if disabled)
///
/// By default this is `None`.
Expand Down Expand Up @@ -513,6 +518,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
rpc_ws_max_connections: self.rpc_ws_max_connections()?,
rpc_cors: self.rpc_cors(is_dev)?,
rpc_max_payload: self.rpc_max_payload()?,
ws_max_out_buffer_capacity: self.ws_max_out_buffer_capacity()?,
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,
telemetry_endpoints,
default_heap_pages: self.default_heap_pages()?,
Expand Down
21 changes: 19 additions & 2 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const MEGABYTE: usize = 1024 * 1024;
/// Maximal payload accepted by RPC servers.
pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE;

/// Maximal buffer size in WS server.
pub const WS_MAX_BUFFER_CAPACITY_DEFAULT: usize = 16 * MEGABYTE;

/// Default maximum number of connections for WS RPC servers.
const WS_MAX_CONNECTIONS: usize = 100;

Expand Down Expand Up @@ -172,18 +175,32 @@ pub fn start_ws<
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
maybe_max_out_buffer_capacity_mb: Option<usize>,
server_metrics: ServerMetrics,
tokio_handle: tokio::runtime::Handle,
) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb
let max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
let max_out_buffer_capacity = maybe_max_out_buffer_capacity_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(WS_MAX_BUFFER_CAPACITY_DEFAULT);

if max_payload > max_out_buffer_capacity {
log::warn!(
"maximum payload ({}) is more than maximum output buffer ({}) size in ws server, the payload will actually be limited by the buffer size",
max_payload,
max_out_buffer_capacity,
)
}

ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
context.sender().into()
})
.event_loop_executor(tokio_handle)
.max_payload(rpc_max_payload)
.max_payload(max_payload)
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.max_out_buffer_capacity(max_out_buffer_capacity)
.allowed_origins(map_cors(cors))
.allowed_hosts(hosts_filtering(cors.is_some()))
.session_stats(server_metrics)
Expand Down
2 changes: 2 additions & 0 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub struct Configuration {
pub rpc_methods: RpcMethods,
/// Maximum payload of rpc request/responses.
pub rpc_max_payload: Option<usize>,
/// Maximum size of the output buffer capacity for websocket connections.
pub ws_max_out_buffer_capacity: Option<usize>,
/// Prometheus endpoint configuration. `None` if disabled.
pub prometheus_config: Option<PrometheusConfig>,
/// Telemetry service URL. `None` if disabled.
Expand Down
1 change: 1 addition & 0 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ fn start_rpc_servers<
),
)?,
config.rpc_max_payload,
config.ws_max_out_buffer_capacity,
server_metrics.clone(),
config.tokio_handle.clone(),
)
Expand Down
1 change: 1 addition & 0 deletions client/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ fn node_config<
rpc_cors: None,
rpc_methods: Default::default(),
rpc_max_payload: None,
ws_max_out_buffer_capacity: None,
prometheus_config: None,
telemetry_endpoints: None,
default_heap_pages: None,
Expand Down
1 change: 1 addition & 0 deletions test-utils/test-runner/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub fn default_config(tokio_handle: Handle, mut chain_spec: Box<dyn ChainSpec>)
rpc_cors: None,
rpc_methods: Default::default(),
rpc_max_payload: None,
ws_max_out_buffer_capacity: None,
prometheus_config: None,
telemetry_endpoints: None,
default_heap_pages: None,
Expand Down

0 comments on commit a4b7b67

Please sign in to comment.