From b15efa8941eaabb5ad909237ca56a8be08d36749 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 6 Feb 2019 19:03:05 +0100 Subject: [PATCH] Limit transaction pool size (#1676) * Avoid excessive hashing. Store extrinsic len. * Implement pool limits. * Fix issues. * Make sure we return error in case it doesn't make into the pool. * Pass parameters from CLI. * Remove redundant todo. * Fix tests. --- Cargo.lock | 1 + core/cli/src/lib.rs | 26 ++- core/cli/src/params.rs | 15 ++ core/service/src/lib.rs | 14 +- core/transaction-pool/Cargo.toml | 2 +- core/transaction-pool/graph/Cargo.toml | 1 + core/transaction-pool/graph/src/base_pool.rs | 107 +++++++++- core/transaction-pool/graph/src/error.rs | 11 +- core/transaction-pool/graph/src/future.rs | 26 +++ core/transaction-pool/graph/src/pool.rs | 196 +++++++++++++++++-- core/transaction-pool/graph/src/ready.rs | 24 ++- core/transaction-pool/graph/src/rotator.rs | 2 + core/transaction-pool/src/api.rs | 6 +- core/transaction-pool/src/tests.rs | 5 +- 14 files changed, 398 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9f46456006a6..7c79f2aede8c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4125,6 +4125,7 @@ dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.85 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.85 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index 7acc0f70992d2..0746e0a8733bd 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -48,7 +48,7 @@ use structopt::{StructOpt, clap::AppSettings}; pub use structopt::clap::App; use params::{ RunCmd, PurgeChainCmd, RevertCmd, ImportBlocksCmd, ExportBlocksCmd, BuildSpecCmd, - NetworkConfigurationParams, SharedParams, MergeParameters + NetworkConfigurationParams, SharedParams, MergeParameters, TransactionPoolParams, }; pub use params::{NoCustom, CoreParams}; pub use traits::{GetLogFilter, AugmentClap}; @@ -103,7 +103,7 @@ fn generate_node_name() -> String { break node_name } }; - + result } @@ -237,6 +237,23 @@ fn parse_node_key(key: Option) -> error::Result> { } } +/// Fill the given `PoolConfiguration` by looking at the cli parameters. +fn fill_transaction_pool_configuration( + options: &mut FactoryFullConfiguration, + params: TransactionPoolParams, +) -> error::Result<()> { + // ready queue + options.transaction_pool.ready.count = params.pool_limit; + options.transaction_pool.ready.total_bytes = params.pool_kbytes * 1024; + + // future queue + let factor = 10; + options.transaction_pool.future.count = params.pool_limit / factor; + options.transaction_pool.future.total_bytes = params.pool_kbytes * 1024 / factor; + + Ok(()) +} + /// Fill the given `NetworkConfiguration` by looking at the cli parameters. fn fill_network_configuration( cli: NetworkConfigurationParams, @@ -356,6 +373,11 @@ where client_id, )?; + fill_transaction_pool_configuration::( + &mut config, + cli.pool_config, + )?; + if let Some(key) = cli.key { config.keys.push(key); } diff --git a/core/cli/src/params.rs b/core/cli/src/params.rs index c1135afb2e683..773b258ce4f81 100644 --- a/core/cli/src/params.rs +++ b/core/cli/src/params.rs @@ -109,6 +109,17 @@ pub struct NetworkConfigurationParams { pub in_peers: u32, } +/// Parameters used to create the pool configuration. +#[derive(Debug, StructOpt, Clone)] +pub struct TransactionPoolParams { + /// Maximum number of transactions in the transaction pool. + #[structopt(long = "pool-limit", value_name = "COUNT", default_value = "512")] + pub pool_limit: usize, + /// Maximum number of kilobytes of all transactions stored in the pool. + #[structopt(long = "pool-kbytes", value_name = "COUNT", default_value="10240")] + pub pool_kbytes: usize, +} + /// The `run` command used to run a node. #[derive(Debug, StructOpt, Clone)] pub struct RunCmd { @@ -183,6 +194,10 @@ pub struct RunCmd { #[allow(missing_docs)] #[structopt(flatten)] pub network_config: NetworkConfigurationParams, + + #[allow(missing_docs)] + #[structopt(flatten)] + pub pool_config: TransactionPoolParams, } impl_augment_clap!(RunCmd); diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index e8b2581daafc1..b329947781a77 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -452,16 +452,16 @@ impl network::TransactionPool, ComponentBlock< let encoded = transaction.encode(); if let Some(uxt) = Decode::decode(&mut &encoded[..]) { let best_block_id = self.best_block_id()?; - let hash = self.pool.hash_of(&uxt); match self.pool.submit_one(&best_block_id, uxt) { Ok(hash) => Some(hash), Err(e) => match e.into_pool_error() { - Ok(e) => match e.kind() { - txpool::error::ErrorKind::AlreadyImported => Some(hash), - _ => { - debug!("Error adding transaction to the pool: {:?}", e); - None - }, + Ok(txpool::error::Error(txpool::error::ErrorKind::AlreadyImported(hash), _)) => { + hash.downcast::>().ok() + .map(|x| x.as_ref().clone()) + }, + Ok(e) => { + debug!("Error adding transaction to the pool: {:?}", e); + None }, Err(e) => { debug!("Error converting pool error: {:?}", e); diff --git a/core/transaction-pool/Cargo.toml b/core/transaction-pool/Cargo.toml index aa31092659918..1dd765bb0efa2 100644 --- a/core/transaction-pool/Cargo.toml +++ b/core/transaction-pool/Cargo.toml @@ -16,5 +16,5 @@ substrate-primitives = { path = "../primitives" } txpool = { package = "substrate-transaction-graph", path = "./graph" } [dev-dependencies] -test_client = { package = "substrate-test-client", path = "../../core/test-client" } keyring = { package = "substrate-keyring", path = "../../core/keyring" } +test_client = { package = "substrate-test-client", path = "../../core/test-client" } diff --git a/core/transaction-pool/graph/Cargo.toml b/core/transaction-pool/graph/Cargo.toml index 05601ae7198d9..5872a385991b1 100644 --- a/core/transaction-pool/graph/Cargo.toml +++ b/core/transaction-pool/graph/Cargo.toml @@ -15,4 +15,5 @@ sr-primitives = { path = "../../sr-primitives" } [dev-dependencies] assert_matches = "1.1" +parity-codec = "3.0" test_runtime = { package = "substrate-test-runtime", path = "../../test-runtime" } diff --git a/core/transaction-pool/graph/src/base_pool.rs b/core/transaction-pool/graph/src/base_pool.rs index 6b63e22b17cac..e2d21de214037 100644 --- a/core/transaction-pool/graph/src/base_pool.rs +++ b/core/transaction-pool/graph/src/base_pool.rs @@ -86,6 +86,8 @@ pub struct PruneStatus { pub struct Transaction { /// Raw extrinsic representing that transaction. pub data: Extrinsic, + /// Number of bytes encoding of the transaction requires. + pub bytes: usize, /// Transaction hash (unique) pub hash: Hash, /// Transaction priority (higher = better) @@ -136,7 +138,7 @@ impl BasePool, ) -> error::Result> { if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { - bail!(error::ErrorKind::AlreadyImported) + bail!(error::ErrorKind::AlreadyImported(Box::new(tx.hash.clone()))) } let tx = WaitingTransaction::new(tx, self.ready.provided_tags()); @@ -243,6 +245,58 @@ impl BasePool Vec>> { + let mut removed = vec![]; + + while ready.is_exceeded(self.ready.len(), self.ready.bytes()) { + // find the worst transaction + let minimal = self.ready + .fold(|minimal, current| { + let transaction = ¤t.transaction; + match minimal { + None => Some(transaction.clone()), + Some(ref tx) if tx.insertion_id > transaction.insertion_id => { + Some(transaction.clone()) + }, + other => other, + } + }); + + if let Some(minimal) = minimal { + removed.append(&mut self.remove_invalid(&[minimal.transaction.hash.clone()])) + } else { + break; + } + } + + while future.is_exceeded(self.future.len(), self.future.bytes()) { + // find the worst transaction + let minimal = self.future + .fold(|minimal, current| { + match minimal { + None => Some(current.clone()), + Some(ref tx) if tx.imported_at > current.imported_at => { + Some(current.clone()) + }, + other => other, + } + }); + + if let Some(minimal) = minimal { + removed.append(&mut self.remove_invalid(&[minimal.transaction.hash.clone()])) + } else { + break; + } + } + + removed + } + /// Removes all transactions represented by the hashes and all other transactions /// that depend on them. /// @@ -298,7 +352,9 @@ impl BasePool Status { Status { ready: self.ready.len(), + ready_bytes: self.ready.bytes(), future: self.future.len(), + future_bytes: self.future.bytes(), } } } @@ -307,8 +363,12 @@ impl BasePool bool { + self.count < count || self.total_bytes < bytes + } +} + #[cfg(test)] mod tests { use super::*; @@ -336,6 +412,7 @@ mod tests { // when pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1u64, priority: 5u64, valid_till: 64u64, @@ -356,6 +433,7 @@ mod tests { // when pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -364,6 +442,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -385,6 +464,7 @@ mod tests { // when pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -395,6 +475,7 @@ mod tests { assert_eq!(pool.ready.len(), 0); pool.import(Transaction { data: vec![2u8], + bytes: 1, hash: 2, priority: 5u64, valid_till: 64u64, @@ -415,6 +496,7 @@ mod tests { // when pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -423,6 +505,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![3u8], + bytes: 1, hash: 3, priority: 5u64, valid_till: 64u64, @@ -431,6 +514,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![2u8], + bytes: 1, hash: 2, priority: 5u64, valid_till: 64u64, @@ -439,6 +523,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![4u8], + bytes: 1, hash: 4, priority: 1_000u64, valid_till: 64u64, @@ -450,6 +535,7 @@ mod tests { let res = pool.import(Transaction { data: vec![5u8], + bytes: 1, hash: 5, priority: 5u64, valid_till: 64u64, @@ -480,6 +566,7 @@ mod tests { let mut pool = pool(); pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -488,6 +575,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![3u8], + bytes: 1, hash: 3, priority: 5u64, valid_till: 64u64, @@ -500,6 +588,7 @@ mod tests { // when pool.import(Transaction { data: vec![2u8], + bytes: 1, hash: 2, priority: 5u64, valid_till: 64u64, @@ -518,6 +607,7 @@ mod tests { // let's close the cycle with one additional transaction let res = pool.import(Transaction { data: vec![4u8], + bytes: 1, hash: 4, priority: 50u64, valid_till: 64u64, @@ -545,6 +635,7 @@ mod tests { let mut pool = pool(); pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -553,6 +644,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![3u8], + bytes: 1, hash: 3, priority: 5u64, valid_till: 64u64, @@ -565,6 +657,7 @@ mod tests { // when pool.import(Transaction { data: vec![2u8], + bytes: 1, hash: 2, priority: 5u64, valid_till: 64u64, @@ -583,6 +676,7 @@ mod tests { // let's close the cycle with one additional transaction let err = pool.import(Transaction { data: vec![4u8], + bytes: 1, hash: 4, priority: 1u64, // lower priority than Tx(2) valid_till: 64u64, @@ -605,6 +699,7 @@ mod tests { let mut pool = pool(); pool.import(Transaction { data: vec![5u8], + bytes: 1, hash: 5, priority: 5u64, valid_till: 64u64, @@ -613,6 +708,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -621,6 +717,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![3u8], + bytes: 1, hash: 3, priority: 5u64, valid_till: 64u64, @@ -629,6 +726,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![2u8], + bytes: 1, hash: 2, priority: 5u64, valid_till: 64u64, @@ -637,6 +735,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![4u8], + bytes: 1, hash: 4, priority: 1_000u64, valid_till: 64u64, @@ -646,6 +745,7 @@ mod tests { // future pool.import(Transaction { data: vec![6u8], + bytes: 1, hash: 6, priority: 1_000u64, valid_till: 64u64, @@ -671,6 +771,7 @@ mod tests { // future (waiting for 0) pool.import(Transaction { data: vec![5u8], + bytes: 1, hash: 5, priority: 5u64, valid_till: 64u64, @@ -680,6 +781,7 @@ mod tests { // ready pool.import(Transaction { data: vec![1u8], + bytes: 1, hash: 1, priority: 5u64, valid_till: 64u64, @@ -688,6 +790,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![2u8], + bytes: 1, hash: 2, priority: 5u64, valid_till: 64u64, @@ -696,6 +799,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![3u8], + bytes: 1, hash: 3, priority: 5u64, valid_till: 64u64, @@ -704,6 +808,7 @@ mod tests { }).unwrap(); pool.import(Transaction { data: vec![4u8], + bytes: 1, hash: 4, priority: 1_000u64, valid_till: 64u64, diff --git a/core/transaction-pool/graph/src/error.rs b/core/transaction-pool/graph/src/error.rs index fb6dc6329ec36..ca2ecb662ede9 100644 --- a/core/transaction-pool/graph/src/error.rs +++ b/core/transaction-pool/graph/src/error.rs @@ -39,9 +39,9 @@ error_chain! { display("Temporarily Banned"), } /// The transaction is already in the pool. - AlreadyImported { - description("Transaction is already in the pool."), - display("Already imported"), + AlreadyImported(hash: Box<::std::any::Any + Send>) { + description("Transaction is already in the pool"), + display("[{:?}] Already imported", hash), } /// The transaction cannot be imported cause it's a replacement and has too low priority. TooLowPriority(old: Priority, new: Priority) { @@ -53,6 +53,11 @@ error_chain! { description("Transaction was not imported because of detected cycle."), display("Cycle Detected"), } + /// Transaction was dropped immediately after it got inserted. + ImmediatelyDropped { + description("Transaction couldn't enter the pool because of the limit."), + display("Immediately Dropped"), + } } } diff --git a/core/transaction-pool/graph/src/future.rs b/core/transaction-pool/graph/src/future.rs index 43fe513f9ac77..8e7e819f0dd89 100644 --- a/core/transaction-pool/graph/src/future.rs +++ b/core/transaction-pool/graph/src/future.rs @@ -18,6 +18,7 @@ use std::{ collections::{HashMap, HashSet}, hash, sync::Arc, + time, }; use sr_primitives::transaction_validity::{ @@ -33,6 +34,18 @@ pub struct WaitingTransaction { pub transaction: Arc>, /// Tags that are required and have not been satisfied yet by other transactions in the pool. pub missing_tags: HashSet, + /// Time of import to the Future Queue. + pub imported_at: time::Instant, +} + +impl Clone for WaitingTransaction { + fn clone(&self) -> Self { + WaitingTransaction { + transaction: self.transaction.clone(), + missing_tags: self.missing_tags.clone(), + imported_at: self.imported_at.clone(), + } + } } impl WaitingTransaction { @@ -50,6 +63,7 @@ impl WaitingTransaction { WaitingTransaction { transaction: Arc::new(transaction), missing_tags, + imported_at: time::Instant::now(), } } @@ -174,6 +188,13 @@ impl FutureTransactions { removed } + /// Fold a list of future transactions to compute a single value. + pub fn fold, &WaitingTransaction) -> Option>(&mut self, f: F) -> Option { + self.waiting + .values() + .fold(None, f) + } + /// Returns iterator over all future transactions pub fn all(&self) -> impl Iterator> { self.waiting.values().map(|waiting| &*waiting.transaction) @@ -183,4 +204,9 @@ impl FutureTransactions { pub fn len(&self) -> usize { self.waiting.len() } + + /// Returns sum of encoding lengths of all transactions in this queue. + pub fn bytes(&self) -> usize { + self.waiting.values().fold(0, |acc, tx| acc + tx.transaction.bytes) + } } diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index eb0ff4876f1dc..f705385f94aeb 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use std::{ - collections::HashMap, + collections::{HashSet, HashMap}, hash, sync::Arc, time, @@ -38,6 +38,8 @@ use sr_primitives::{ transaction_validity::{TransactionValidity, TransactionTag as Tag}, }; +pub use crate::base_pool::Limit; + /// Modification notification event stream type; pub type EventStream = mpsc::UnboundedReceiver<()>; @@ -70,17 +72,38 @@ pub trait ChainApi: Send + Sync { /// Returns a block hash given the block id. fn block_id_to_hash(&self, at: &BlockId) -> Result>, Self::Error>; - /// Hash the extrinsic. - fn hash(&self, uxt: &ExtrinsicFor) -> Self::Hash; + /// Returns hash and encoding length of the extrinsic. + fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize); } /// Pool configuration options. -#[derive(Debug, Clone, Default)] -pub struct Options; +#[derive(Debug, Clone)] +pub struct Options { + /// Ready queue limits. + pub ready: Limit, + /// Future queue limits. + pub future: Limit, +} + +impl Default for Options { + fn default() -> Self { + Options { + ready: Limit { + count: 512, + total_bytes: 10 * 1024 * 1024, + }, + future: Limit { + count: 128, + total_bytes: 1 * 1024 * 1024, + }, + } + } +} /// Extrinsics pool. pub struct Pool { api: B, + options: Options, listener: RwLock, BlockHash>>, pool: RwLock, @@ -91,7 +114,6 @@ pub struct Pool { } impl Pool { - /// Imports a bunch of unverified extrinsics to the pool pub fn submit_at(&self, at: &BlockId, xts: T) -> Result, B::Error>>, B::Error> where T: IntoIterator> @@ -99,10 +121,10 @@ impl Pool { let block_number = self.api.block_id_to_number(at)? .ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?; - Ok(xts + let results = xts .into_iter() .map(|xt| -> Result<_, B::Error> { - let hash = self.api.hash(&xt); + let (hash, bytes) = self.api.hash_and_length(&xt); if self.rotator.is_banned(&hash) { bail!(error::Error::from(error::ErrorKind::TemporarilyBanned)) } @@ -111,6 +133,7 @@ impl Pool { TransactionValidity::Valid { priority, requires, provides, longevity } => { Ok(base::Transaction { data: xt, + bytes, hash, priority, requires, @@ -138,7 +161,42 @@ impl Pool { fire_events(&mut *listener, &imported); Ok(imported.hash().clone()) }) - .collect()) + .collect::>(); + + let removed = self.enforce_limits(); + + Ok(results.into_iter().map(|res| match res { + Ok(ref hash) if removed.contains(hash) => Err(error::Error::from(error::ErrorKind::ImmediatelyDropped).into()), + other => other, + }).collect()) + } + + fn enforce_limits(&self) -> HashSet> { + let status = self.pool.read().status(); + let ready_limit = &self.options.ready; + let future_limit = &self.options.future; + + if ready_limit.is_exceeded(status.ready, status.ready_bytes) + || future_limit.is_exceeded(status.future, status.future_bytes) { + // clean up the pool + let removed = { + let mut pool = self.pool.write(); + let removed = pool.enforce_limits(ready_limit, future_limit) + .into_iter().map(|x| x.hash.clone()).collect::>(); + // ban all removed transactions + self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); + removed + }; + // run notifications + let mut listener = self.listener.write(); + for h in &removed { + listener.dropped(h, None); + } + + removed + } else { + Default::default() + } } /// Imports one unverified extrinsic to the pool @@ -148,7 +206,7 @@ impl Pool { /// Import a single extrinsic and starts to watch their progress in the pool. pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, BlockHash>, B::Error> { - let hash = self.api.hash(&xt); + let hash = self.api.hash_and_length(&xt).0; let watcher = self.listener.write().create_watcher(hash); self.submit_one(at, xt)?; Ok(watcher) @@ -163,7 +221,7 @@ impl Pool { pub fn prune(&self, at: &BlockId, parent: &BlockId, extrinsics: &[ExtrinsicFor]) -> Result<(), B::Error> { let mut tags = Vec::with_capacity(extrinsics.len()); // Get details of all extrinsics that are already in the pool - let hashes = extrinsics.iter().map(|extrinsic| self.api.hash(extrinsic)).collect::>(); + let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::>(); let in_pool = self.pool.read().by_hash(&hashes); { // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option)`) @@ -303,13 +361,12 @@ impl Pool { Ok(()) } -} -impl Pool { /// Create a new transaction pool. - pub fn new(_options: Options, api: B) -> Self { + pub fn new(options: Options, api: B) -> Self { Pool { api, + options, listener: Default::default(), pool: Default::default(), import_notification_sinks: Default::default(), @@ -359,8 +416,9 @@ impl Pool { } /// Returns transaction hash - pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { - self.api.hash(xt) + #[cfg(test)] + fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { + self.api.hash_and_length(xt).0 } } @@ -394,6 +452,7 @@ fn fire_events( mod tests { use super::*; use futures::Stream; + use parity_codec::Encode; use test_runtime::{Block, Extrinsic, Transfer, H256}; use assert_matches::assert_matches; use crate::watcher; @@ -440,8 +499,12 @@ mod tests { } /// Hash the extrinsic. - fn hash(&self, uxt: &ExtrinsicFor) -> Self::Hash { - (uxt.transfer().from.to_low_u64_be() << 5) + uxt.transfer().nonce + fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize) { + let len = uxt.encode().len(); + ( + (uxt.transfer().from.to_low_u64_be() << 5) + uxt.transfer().nonce, + len + ) } } @@ -586,6 +649,66 @@ mod tests { assert!(pool.rotator.is_banned(&hash1)); } + #[test] + fn should_limit_futures() { + // given + let limit = Limit { + count: 100, + total_bytes: 200, + }; + let pool = Pool::new(Options { + ready: limit.clone(), + future: limit.clone(), + }, TestApi::default()); + + let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: H256::from_low_u64_be(1), + to: H256::from_low_u64_be(2), + amount: 5, + nonce: 1, + })).unwrap(); + assert_eq!(pool.status().future, 1); + + // when + let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: H256::from_low_u64_be(2), + to: H256::from_low_u64_be(2), + amount: 5, + nonce: 10, + })).unwrap(); + + // then + assert_eq!(pool.status().future, 1); + assert!(pool.rotator.is_banned(&hash1)); + assert!(!pool.rotator.is_banned(&hash2)); + } + + #[test] + fn should_error_if_reject_immediately() { + // given + let limit = Limit { + count: 100, + total_bytes: 10, + }; + let pool = Pool::new(Options { + ready: limit.clone(), + future: limit.clone(), + }, TestApi::default()); + + // when + pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: H256::from_low_u64_be(1), + to: H256::from_low_u64_be(2), + amount: 5, + nonce: 1, + })).unwrap_err(); + + // then + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 0); + } + + mod listener { use super::*; @@ -716,5 +839,42 @@ mod tests { assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(Ok(watcher::Status::Broadcast(peers)))); } + + #[test] + fn should_trigger_dropped() { + // given + let limit = Limit { + count: 1, + total_bytes: 1000, + }; + let pool = Pool::new(Options { + ready: limit.clone(), + future: limit.clone(), + }, TestApi::default()); + + let xt = uxt(Transfer { + from: H256::from_low_u64_be(1), + to: H256::from_low_u64_be(2), + amount: 5, + nonce: 0, + }); + let watcher = pool.submit_and_watch(&BlockId::Number(0), xt).unwrap(); + assert_eq!(pool.status().ready, 1); + + // when + let xt = uxt(Transfer { + from: H256::from_low_u64_be(2), + to: H256::from_low_u64_be(1), + amount: 4, + nonce: 1, + }); + pool.submit_one(&BlockId::Number(1), xt).unwrap(); + assert_eq!(pool.status().ready, 1); + + // then + let mut stream = watcher.into_stream().wait(); + assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); + assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped))); + } } } diff --git a/core/transaction-pool/graph/src/ready.rs b/core/transaction-pool/graph/src/ready.rs index eb3d61f397556..faca722d12c37 100644 --- a/core/transaction-pool/graph/src/ready.rs +++ b/core/transaction-pool/graph/src/ready.rs @@ -34,9 +34,14 @@ use crate::error; use crate::future::WaitingTransaction; use crate::base_pool::Transaction; +/// An in-pool transaction reference. +/// +/// Should be cheap to clone. #[derive(Debug)] -struct TransactionRef { +pub struct TransactionRef { + /// The actual transaction data. pub transaction: Arc>, + /// Unique id when transaction was inserted into the pool. pub insertion_id: u64, } @@ -71,7 +76,7 @@ impl PartialEq for TransactionRef { impl Eq for TransactionRef {} #[derive(Debug)] -struct ReadyTx { +pub struct ReadyTx { /// A reference to a transaction pub transaction: TransactionRef, /// A list of transactions that get unlocked by this one @@ -152,6 +157,7 @@ impl ReadyTransactions { /// /// The transaction needs to have all tags satisfied (be ready) by transactions /// that are in this queue. + /// Returns transactions that were replaced by the one imported. pub fn import( &mut self, tx: WaitingTransaction, @@ -204,6 +210,14 @@ impl ReadyTransactions { Ok(replaced) } + /// Fold a list of ready transactions to compute a single value. + pub fn fold, &ReadyTx) -> Option>(&mut self, f: F) -> Option { + self.ready + .read() + .values() + .fold(None, f) + } + /// Returns true if given hash is part of the queue. pub fn contains(&self, hash: &Hash) -> bool { self.ready.read().contains_key(hash) @@ -401,6 +415,10 @@ impl ReadyTransactions { self.ready.read().len() } + /// Returns sum of encoding lengths of all transactions in this queue. + pub fn bytes(&self) -> usize { + self.ready.read().values().fold(0, |acc, tx| acc + tx.transaction.transaction.bytes) + } } pub struct BestIterator { @@ -476,6 +494,7 @@ mod tests { fn tx(id: u8) -> Transaction> { Transaction { data: vec![id], + bytes: 1, hash: id as u64, priority: 1, valid_till: 2, @@ -534,6 +553,7 @@ mod tests { tx4.provides = vec![]; let tx5 = Transaction { data: vec![5], + bytes: 1, hash: 5, priority: 1, valid_till: u64::max_value(), // use the max_value() here for testing. diff --git a/core/transaction-pool/graph/src/rotator.rs b/core/transaction-pool/graph/src/rotator.rs index 6ca10a900556a..eed7e628932bc 100644 --- a/core/transaction-pool/graph/src/rotator.rs +++ b/core/transaction-pool/graph/src/rotator.rs @@ -114,6 +114,7 @@ mod tests { let hash = 5u64; let tx = Transaction { data: (), + bytes: 1, hash: hash.clone(), priority: 5, valid_till: 1, @@ -178,6 +179,7 @@ mod tests { let hash = i; Transaction { data: (), + bytes: 2, hash, priority: 5, valid_till, diff --git a/core/transaction-pool/src/api.rs b/core/transaction-pool/src/api.rs index 0d6c60a7cbb86..dd8cb95c7ddd6 100644 --- a/core/transaction-pool/src/api.rs +++ b/core/transaction-pool/src/api.rs @@ -75,7 +75,9 @@ impl txpool::ChainApi for ChainApi where Ok(self.client.block_hash_from_id(at)?) } - fn hash(&self, ex: &txpool::ExtrinsicFor) -> Self::Hash { - Blake2Hasher::hash(&ex.encode()) + fn hash_and_length(&self, ex: &txpool::ExtrinsicFor) -> (Self::Hash, usize) { + ex.using_encoded(|x| { + (Blake2Hasher::hash(x), x.len()) + }) } } diff --git a/core/transaction-pool/src/tests.rs b/core/transaction-pool/src/tests.rs index cb37dc07a0ca0..770f00fa64916 100644 --- a/core/transaction-pool/src/tests.rs +++ b/core/transaction-pool/src/tests.rs @@ -68,8 +68,9 @@ impl txpool::ChainApi for TestApi { }) } - fn hash(&self, ex: &txpool::ExtrinsicFor) -> Self::Hash { - BlakeTwo256::hash(&ex.encode()) + fn hash_and_length(&self, ex: &txpool::ExtrinsicFor) -> (Self::Hash, usize) { + let encoded = ex.encode(); + (BlakeTwo256::hash(&encoded), encoded.len()) } }