diff --git a/Cargo.lock b/Cargo.lock index f3a76716ae4c0..de35b70c5546f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2441,6 +2441,7 @@ name = "node-rpc" version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-derive 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5380,6 +5381,7 @@ dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", "substrate-client 2.0.0", @@ -5536,6 +5538,7 @@ name = "substrate-transaction-pool" version = "2.0.0" dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/basic-authorship/src/basic_authorship.rs b/core/basic-authorship/src/basic_authorship.rs index 59b12ba1e40b4..7f8b343f6512c 100644 --- a/core/basic-authorship/src/basic_authorship.rs +++ b/core/basic-authorship/src/basic_authorship.rs @@ -247,10 +247,12 @@ mod tests { fn should_cease_building_block_when_deadline_is_reached() { // given let client = Arc::new(test_client::new()); - let chain_api = transaction_pool::ChainApi::new(client.clone()); + let chain_api = transaction_pool::FullChainApi::new(client.clone()); let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api)); - txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap(); + futures::executor::block_on( + txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false) + ).unwrap(); let mut proposer_factory = ProposerFactory { client: client.clone(), diff --git a/core/basic-authorship/src/lib.rs b/core/basic-authorship/src/lib.rs index 71c9e2792248f..7961e4fe9e9df 100644 --- a/core/basic-authorship/src/lib.rs +++ b/core/basic-authorship/src/lib.rs @@ -26,7 +26,7 @@ //! # use test_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring}; //! # use transaction_pool::txpool::{self, Pool as TransactionPool}; //! # let client = Arc::new(test_client::new()); -//! # let chain_api = transaction_pool::ChainApi::new(client.clone()); +//! # let chain_api = transaction_pool::FullChainApi::new(client.clone()); //! # let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api)); //! // The first step is to create a `ProposerFactory`. //! let mut proposer_factory = ProposerFactory { diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index ef581a6e43c3d..dc9e6688e74ee 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -963,6 +963,14 @@ impl, H: ExHashT> Protocol { who: PeerId, extrinsics: message::Transactions ) { + // sending extrinsic to light node is considered a bad behavior + if !self.config.roles.is_full() { + trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who); + self.behaviour.disconnect_peer(&who); + self.peerset_handle.report_peer(who, i32::min_value()); + return; + } + // Accept extrinsics only when fully synced if self.sync.status().state != SyncState::Idle { trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); @@ -971,12 +979,15 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { - if let Some(hash) = self.transaction_pool.import(&t) { - self.peerset_handle.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); - peer.known_extrinsics.insert(hash); - } else { - trace!(target: "sync", "Extrinsic rejected"); - } + let hash = self.transaction_pool.hash_of(&t); + peer.known_extrinsics.insert(hash); + + self.transaction_pool.import( + self.peerset_handle.clone().into(), + who.clone(), + NEW_EXTRINSIC_REPUTATION_CHANGE, + t, + ); } } } @@ -995,6 +1006,11 @@ impl, H: ExHashT> Protocol { let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { + // never send extrinsics to the light node + if !peer.info.roles.is_full() { + continue; + } + let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 2cf949116f012..5e8d41340c1bf 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -65,8 +65,18 @@ impl ExHashT for T where pub trait TransactionPool: Send + Sync { /// Get transactions from the pool that are ready to be propagated. fn transactions(&self) -> Vec<(H, B::Extrinsic)>; + /// Get hash of transaction. + fn hash_of(&self, transaction: &B::Extrinsic) -> H; /// Import a transaction into the pool. - fn import(&self, transaction: &B::Extrinsic) -> Option; + /// + /// Peer reputation is changed by reputation_change if transaction is accepted by the pool. + fn import( + &self, + report_handle: ReportHandle, + who: PeerId, + reputation_change: i32, + transaction: B::Extrinsic, + ); /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap>); } @@ -77,6 +87,12 @@ pub struct ReportHandle { inner: PeersetHandle, // wraps it so we don't have to worry about breaking API. } +impl From for ReportHandle { + fn from(peerset_handle: PeersetHandle) -> Self { + ReportHandle { inner: peerset_handle } + } +} + impl ReportHandle { /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 920636810ba6b..2c87ba1ac8593 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -47,7 +47,7 @@ use consensus::Error as ConsensusError; use consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport}; use futures::prelude::*; use futures03::{StreamExt as _, TryStreamExt as _}; -use crate::{NetworkWorker, NetworkService, config::ProtocolId}; +use crate::{NetworkWorker, NetworkService, ReportHandle, config::ProtocolId}; use crate::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder}; use libp2p::PeerId; use parking_lot::Mutex; @@ -400,10 +400,12 @@ impl TransactionPool for EmptyTransactionPool { Vec::new() } - fn import(&self, _transaction: &Extrinsic) -> Option { - None + fn hash_of(&self, _transaction: &Extrinsic) -> Hash { + Hash::default() } + fn import(&self, _report_handle: ReportHandle, _who: PeerId, _rep_change: i32, _transaction: Extrinsic) {} + fn on_broadcasted(&self, _: HashMap>) {} } diff --git a/core/offchain/src/api.rs b/core/offchain/src/api.rs index 62b73f28d6e49..d17a892d975e1 100644 --- a/core/offchain/src/api.rs +++ b/core/offchain/src/api.rs @@ -302,29 +302,28 @@ impl AsyncApi { match msg { ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext), } - future::ready(()) }); future::join(extrinsics, http) .map(|((), ())| ()) } - fn submit_extrinsic(&mut self, ext: Vec) { + fn submit_extrinsic(&mut self, ext: Vec) -> impl Future { let xt = match ::Extrinsic::decode(&mut &*ext) { Ok(xt) => xt, Err(e) => { warn!("Unable to decode extrinsic: {:?}: {}", ext, e.what()); - return + return future::Either::Left(future::ready(())) }, }; info!("Submitting to the pool: {:?} (isSigned: {:?})", xt, xt.is_signed()); - match self.transaction_pool.submit_one(&self.at, xt.clone()) { - Ok(hash) => debug!("[{:?}] Offchain transaction added to the pool.", hash), - Err(e) => { - debug!("Couldn't submit transaction: {:?}", e); - }, - } + future::Either::Right(self.transaction_pool + .submit_one(&self.at, xt.clone()) + .map(|result| match result { + Ok(hash) => { debug!("[{:?}] Offchain transaction added to the pool.", hash); }, + Err(e) => { debug!("Couldn't submit transaction: {:?}", e); }, + })) } } @@ -354,7 +353,7 @@ mod tests { let db = LocalStorage::new_test(); let client = Arc::new(test_client::new()); let pool = Arc::new( - Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone())) + Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone())) ); let mock = Arc::new(MockNetworkStateInfo()); diff --git a/core/offchain/src/lib.rs b/core/offchain/src/lib.rs index 9b785ec8bada1..79c6df04ea109 100644 --- a/core/offchain/src/lib.rs +++ b/core/offchain/src/lib.rs @@ -171,7 +171,7 @@ mod tests { // given let _ = env_logger::try_init(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone()))); let db = client_db::offchain::LocalStorage::new_test(); let network_state = Arc::new(MockNetworkStateInfo()); diff --git a/core/rpc/api/src/author/error.rs b/core/rpc/api/src/author/error.rs index 727b58bd210f4..8e4f8877682b3 100644 --- a/core/rpc/api/src/author/error.rs +++ b/core/rpc/api/src/author/error.rs @@ -22,6 +22,9 @@ use jsonrpc_core as rpc; /// Author RPC Result type. pub type Result = std::result::Result; +/// Author RPC future Result type. +pub type FutureResult = Box + Send>; + /// Author RPC errors. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { diff --git a/core/rpc/api/src/author/mod.rs b/core/rpc/api/src/author/mod.rs index 5cde56995aad9..4ea96cb3c6122 100644 --- a/core/rpc/api/src/author/mod.rs +++ b/core/rpc/api/src/author/mod.rs @@ -24,7 +24,7 @@ use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use primitives::{ Bytes }; -use self::error::Result; +use self::error::{FutureResult, Result}; use txpool::watcher::Status; pub use self::gen_client::Client as AuthorClient; @@ -37,7 +37,7 @@ pub trait AuthorApi { /// Submit hex-encoded extrinsic for inclusion in block. #[rpc(name = "author_submitExtrinsic")] - fn submit_extrinsic(&self, extrinsic: Bytes) -> Result; + fn submit_extrinsic(&self, extrinsic: Bytes) -> FutureResult; /// Insert a key into the keystore. #[rpc(name = "author_insertKey")] diff --git a/core/rpc/api/src/lib.rs b/core/rpc/api/src/lib.rs index 78fa58f14af10..12db07633dcc4 100644 --- a/core/rpc/api/src/lib.rs +++ b/core/rpc/api/src/lib.rs @@ -25,7 +25,7 @@ mod helpers; mod subscriptions; pub use jsonrpc_core::IoHandlerExtension as RpcExtension; -pub use subscriptions::Subscriptions; +pub use subscriptions::{Subscriptions, TaskExecutor}; pub use helpers::Receiver; pub mod author; diff --git a/core/rpc/api/src/subscriptions.rs b/core/rpc/api/src/subscriptions.rs index f284e0ef5299d..bff184cadeac0 100644 --- a/core/rpc/api/src/subscriptions.rs +++ b/core/rpc/api/src/subscriptions.rs @@ -25,6 +25,9 @@ use jsonrpc_core::futures::{Future, future}; type Id = u64; +/// Alias for a an implementation of `futures::future::Executor`. +pub type TaskExecutor = Arc + Send>> + Send + Sync>; + /// Generate unique ids for subscriptions. #[derive(Clone, Debug)] pub struct IdProvider { @@ -53,12 +56,12 @@ impl IdProvider { pub struct Subscriptions { next_id: IdProvider, active_subscriptions: Arc>>>, - executor: Arc + Send>> + Send + Sync>, + executor: TaskExecutor, } impl Subscriptions { /// Creates new `Subscriptions` object. - pub fn new(executor: Arc + Send>> + Send + Sync>) -> Self { + pub fn new(executor: TaskExecutor) -> Self { Subscriptions { next_id: Default::default(), active_subscriptions: Default::default(), diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index 50b5e30d578a3..9a978f22f717a 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -20,13 +20,18 @@ mod tests; use std::{sync::Arc, convert::TryInto}; +use futures03::future::{FutureExt, TryFutureExt}; +use log::warn; use client::{self, Client}; -use rpc::futures::{Sink, Future}; +use rpc::futures::{ + Sink, Future, + stream::Stream as _, + future::result, +}; use futures03::{StreamExt as _, compat::Compat}; use api::Subscriptions; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; -use log::warn; use codec::{Encode, Decode}; use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr}; use sr_primitives::{generic, traits::{self, ProvideRuntimeApi}}; @@ -44,7 +49,7 @@ use session::SessionKeys; /// Re-export the API for backward compatibility. pub use api::author::*; -use self::error::{Error, Result}; +use self::error::{Error, FutureResult, Result}; /// Authoring API pub struct Author where P: PoolChainApi + Sync + Send + 'static { @@ -108,15 +113,19 @@ impl AuthorApi, BlockHash

> for Author whe ).map(Into::into).map_err(|e| Error::Client(Box::new(e))) } - fn submit_extrinsic(&self, ext: Bytes) -> Result> { - let xt = Decode::decode(&mut &ext[..])?; + fn submit_extrinsic(&self, ext: Bytes) -> FutureResult> { + let xt = match Decode::decode(&mut &ext[..]) { + Ok(xt) => xt, + Err(err) => return Box::new(result(Err(err.into()))), + }; let best_block_hash = self.client.info().chain.best_hash; - self.pool + Box::new(self.pool .submit_one(&generic::BlockId::hash(best_block_hash), xt) + .compat() .map_err(|e| e.into_pool_error() .map(Into::into) - .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) - ) + .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())) + ) } fn pending_extrinsics(&self) -> Result> { @@ -151,17 +160,20 @@ impl AuthorApi, BlockHash

> for Author whe ) { let submit = || -> Result<_> { let best_block_hash = self.client.info().chain.best_hash; - let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])?; - self.pool + let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]) + .map_err(error::Error::from)?; + Ok(self.pool .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) + .boxed() + .compat() .map_err(|e| e.into_pool_error() - .map(Into::into) + .map(error::Error::from) .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) - ) + )) }; - let watcher = match submit() { - Ok(watcher) => watcher, + let future_watcher = match submit() { + Ok(future_watcher) => future_watcher, Err(err) => { // reject the subscriber (ignore errors - we don't care if subscriber is no longer there). let _ = subscriber.reject(err.into()); @@ -169,12 +181,23 @@ impl AuthorApi, BlockHash

> for Author whe }, }; + // make 'future' watcher be a future with output = stream of watcher events + let future_watcher = future_watcher + .map_err(|err| { warn!("Failed to submit extrinsic: {}", err); }) + .map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))); + + // convert a 'future' watcher into the stream with single element = stream of watcher events + let watcher_stream = future_watcher.into_stream(); + + // and now flatten the 'watcher_stream' so that we'll have the stream with watcher events + let watcher_stream = watcher_stream.flatten(); + self.subscriptions.add(subscriber, move |sink| { sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))) + .send_all(watcher_stream) .map(|_| ()) - }) + }); } fn unwatch_extrinsic(&self, _metadata: Option, id: SubscriptionId) -> Result { diff --git a/core/rpc/src/author/tests.rs b/core/rpc/src/author/tests.rs index 1f652aec64a68..57d3929d928f7 100644 --- a/core/rpc/src/author/tests.rs +++ b/core/rpc/src/author/tests.rs @@ -21,9 +21,8 @@ use assert_matches::assert_matches; use codec::Encode; use transaction_pool::{ txpool::Pool, - ChainApi, + FullChainApi, }; -use futures::Stream; use primitives::{ H256, blake2_256, hexdisplay::HexDisplay, traits::BareCryptoStore, testing::{ED25519, SR25519, KeyStore}, ed25519, crypto::Pair @@ -51,7 +50,7 @@ fn submit_transaction_should_not_cause_error() { let keystore = KeyStore::new(); let p = Author { client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; @@ -59,11 +58,11 @@ fn submit_transaction_should_not_cause_error() { let h: H256 = blake2_256(&xt).into(); assert_matches!( - AuthorApi::submit_extrinsic(&p, xt.clone().into()), + AuthorApi::submit_extrinsic(&p, xt.clone().into()).wait(), Ok(h2) if h == h2 ); assert!( - AuthorApi::submit_extrinsic(&p, xt.into()).is_err() + AuthorApi::submit_extrinsic(&p, xt.into()).wait().is_err() ); } @@ -74,7 +73,7 @@ fn submit_rich_transaction_should_not_cause_error() { let keystore = KeyStore::new(); let p = Author { client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; @@ -82,11 +81,11 @@ fn submit_rich_transaction_should_not_cause_error() { let h: H256 = blake2_256(&xt).into(); assert_matches!( - AuthorApi::submit_extrinsic(&p, xt.clone().into()), + AuthorApi::submit_extrinsic(&p, xt.clone().into()).wait(), Ok(h2) if h == h2 ); assert!( - AuthorApi::submit_extrinsic(&p, xt.into()).is_err() + AuthorApi::submit_extrinsic(&p, xt.into()).wait().is_err() ); } @@ -95,7 +94,7 @@ fn should_watch_extrinsic() { //given let mut runtime = runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { client, @@ -120,7 +119,7 @@ fn should_watch_extrinsic() { }; tx.into_signed_tx() }; - AuthorApi::submit_extrinsic(&p, replacement.encode().into()).unwrap(); + AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap(); let (res, data) = runtime.block_on(data.into_future()).unwrap(); assert_eq!( res, @@ -137,7 +136,7 @@ fn should_watch_extrinsic() { fn should_return_pending_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { client, @@ -146,7 +145,7 @@ fn should_return_pending_extrinsics() { keystore: keystore.clone(), }; let ex = uxt(AccountKeyring::Alice, 0); - AuthorApi::submit_extrinsic(&p, ex.encode().into()).unwrap(); + AuthorApi::submit_extrinsic(&p, ex.encode().into()).wait().unwrap(); assert_matches!( p.pending_extrinsics(), Ok(ref expected) if *expected == vec![Bytes(ex.encode())] @@ -157,7 +156,7 @@ fn should_return_pending_extrinsics() { fn should_remove_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); let keystore = KeyStore::new(); let p = Author { client, @@ -166,11 +165,11 @@ fn should_remove_extrinsics() { keystore: keystore.clone(), }; let ex1 = uxt(AccountKeyring::Alice, 0); - p.submit_extrinsic(ex1.encode().into()).unwrap(); + p.submit_extrinsic(ex1.encode().into()).wait().unwrap(); let ex2 = uxt(AccountKeyring::Alice, 1); - p.submit_extrinsic(ex2.encode().into()).unwrap(); + p.submit_extrinsic(ex2.encode().into()).wait().unwrap(); let ex3 = uxt(AccountKeyring::Bob, 0); - let hash3 = p.submit_extrinsic(ex3.encode().into()).unwrap(); + let hash3 = p.submit_extrinsic(ex3.encode().into()).wait().unwrap(); assert_eq!(pool.status().ready, 3); // now remove all 3 @@ -190,7 +189,7 @@ fn should_insert_key() { let keystore = KeyStore::new(); let p = Author { client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; @@ -216,7 +215,7 @@ fn should_rotate_keys() { let client = Arc::new(test_client::TestClientBuilder::new().set_keystore(keystore.clone()).build()); let p = Author { client: client.clone(), - pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), + pool: Arc::new(Pool::new(Default::default(), FullChainApi::new(client))), subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index 3317ab23cd797..f0860de1c1cfa 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -26,7 +26,12 @@ use chain_spec::{RuntimeGenesis, Extension}; use codec::{Decode, Encode, IoReader}; use consensus_common::import_queue::ImportQueue; use futures::{prelude::*, sync::mpsc}; -use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _}; +use futures03::{ + compat::Compat, + future::ready, + FutureExt as _, TryFutureExt as _, + StreamExt as _, TryStreamExt as _, +}; use keystore::{Store as Keystore, KeyStorePtr}; use log::{info, warn}; use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; @@ -913,7 +918,7 @@ where RpcB: RpcBuilder, { use rpc::{chain, state, author, system}; - let subscriptions = rpc::Subscriptions::new(task_executor.clone()); + let subscriptions = rpc::Subscriptions::new(task_executor); let chain = rpc_builder.build_chain(subscriptions.clone()); let state = rpc_builder.build_state(subscriptions.clone()); let author = rpc::author::Author::new( @@ -935,45 +940,54 @@ where pub(crate) fn maintain_transaction_pool( id: &BlockId, - client: &Client, + client: &Arc>, transaction_pool: &TransactionPool, retracted: &[Block::Hash], -) -> error::Result<()> where +) -> error::Result + Send>> where Block: BlockT::Out>, - Backend: client::backend::Backend, + Backend: 'static + client::backend::Backend, Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue, - Executor: client::CallExecutor, - PoolApi: txpool::ChainApi, + Executor: 'static + client::CallExecutor, + PoolApi: 'static + txpool::ChainApi, + Api: 'static, { // Put transactions from retracted blocks back into the pool. - for r in retracted { - if let Some(block) = client.block(&BlockId::hash(*r))? { - let extrinsics = block.block.extrinsics(); - if let Err(e) = transaction_pool.submit_at( - id, - extrinsics.iter().filter(|e| { - e.is_signed().unwrap_or(false) - }).cloned(), - true - ) { + let client_copy = client.clone(); + let retracted_transactions = retracted.to_vec().into_iter() + .filter_map(move |hash| client_copy.block(&BlockId::hash(hash)).ok().unwrap_or(None)) + .flat_map(|block| block.block.deconstruct().1.into_iter()) + .filter(|tx| tx.is_signed().unwrap_or(false)); + let resubmit_future = transaction_pool + .submit_at(id, retracted_transactions, true) + .then(|resubmit_result| ready(match resubmit_result { + Ok(_) => Ok(()), + Err(e) => { warn!("Error re-submitting transactions: {:?}", e); + Ok(()) } - } - } + })) + .compat(); // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { - return Ok(()) - } - - if let Some(block) = client.block(id)? { - let parent_id = BlockId::hash(*block.block.header().parent_hash()); - let extrinsics = block.block.extrinsics(); - transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?; + return Ok(Box::new(resubmit_future)) } - Ok(()) + let block = client.block(id)?; + Ok(match block { + Some(block) => { + let parent_id = BlockId::hash(*block.block.header().parent_hash()); + let prune_future = transaction_pool + .prune(id, &parent_id, block.block.extrinsics()) + .boxed() + .compat() + .map_err(|e| { format!("{:?}", e); }); + + Box::new(resubmit_future.and_then(|_| prune_future)) + }, + None => Box::new(resubmit_future), + }) } pub(crate) fn offchain_workers( @@ -1005,6 +1019,7 @@ where #[cfg(test)] mod tests { use super::*; + use futures03::executor::block_on; use consensus_common::{BlockOrigin, SelectChain}; use substrate_test_runtime_client::{prelude::*, runtime::Transfer}; @@ -1012,7 +1027,7 @@ mod tests { fn should_remove_transactions_from_the_pool() { let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); let client = Arc::new(client); - let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); + let pool = TransactionPool::new(Default::default(), ::transaction_pool::FullChainApi::new(client.clone())); let transaction = Transfer { amount: 5, nonce: 0, @@ -1022,7 +1037,7 @@ mod tests { let best = longest_chain.best_chain().unwrap(); // store the transaction in the pool - pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); // import the block let mut builder = client.new_block(Default::default()).unwrap(); @@ -1038,7 +1053,7 @@ mod tests { &client, &pool, &[] - ).unwrap(); + ).unwrap().wait().unwrap(); // then assert_eq!(pool.status().ready, 0); @@ -1049,7 +1064,7 @@ mod tests { fn should_add_reverted_transactions_to_the_pool() { let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); let client = Arc::new(client); - let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); + let pool = TransactionPool::new(Default::default(), ::transaction_pool::FullChainApi::new(client.clone())); let transaction = Transfer { amount: 5, nonce: 0, @@ -1059,7 +1074,7 @@ mod tests { let best = longest_chain.best_chain().unwrap(); // store the transaction in the pool - pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); // import the block let mut builder = client.new_block(Default::default()).unwrap(); @@ -1076,7 +1091,7 @@ mod tests { &client, &pool, &[] - ).unwrap(); + ).unwrap().wait().unwrap(); // then assert_eq!(pool.status().ready, 0); @@ -1094,7 +1109,7 @@ mod tests { &client, &pool, &[block1_hash] - ).unwrap(); + ).unwrap().wait().unwrap(); // then assert_eq!(pool.status().ready, 1); diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 72ee2d851148a..afe5bad8f5504 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -36,8 +36,14 @@ use parking_lot::Mutex; use client::{runtime_api::BlockT, Client}; use exit_future::Signal; use futures::prelude::*; -use futures03::stream::{StreamExt as _, TryStreamExt as _}; -use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent}; +use futures03::{ + future::{ready, FutureExt as _, TryFutureExt as _}, + stream::{StreamExt as _, TryStreamExt as _}, +}; +use network::{ + NetworkService, NetworkState, specialization::NetworkSpecialization, + Event, DhtEvent, PeerId, ReportHandle, +}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use primitives::{Blake2Hasher, H256}; @@ -168,6 +174,7 @@ macro_rules! new_impl { imports_external_transactions: !$config.roles.is_light(), pool: transaction_pool.clone(), client: client.clone(), + executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }), }); let protocol_id = { @@ -233,12 +240,13 @@ macro_rules! new_impl { let txpool = txpool.upgrade(); if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) { - $maintain_transaction_pool( + let future = $maintain_transaction_pool( &BlockId::hash(notification.hash), - &*client, + &client, &*txpool, ¬ification.retracted, ).map_err(|e| warn!("Pool error processing new block: {:?}", e))?; + let _ = to_spawn_tx_.unbounded_send(future); } let offchain = offchain.as_ref().and_then(|o| o.upgrade()); @@ -871,6 +879,7 @@ pub struct TransactionPoolAdapter { imports_external_transactions: bool, pool: Arc

, client: Arc, + executor: TaskExecutor, } /// Get transactions for propagation. @@ -898,7 +907,7 @@ impl network::TransactionPool for TransactionPoolAdapter> where C: network::ClientHandle + Send + Sync, - PoolApi: ChainApi, + PoolApi: 'static + ChainApi, B: BlockT, H: std::hash::Hash + Eq + sr_primitives::traits::Member + sr_primitives::traits::MaybeSerialize, E: txpool::error::IntoPoolError + From, @@ -907,38 +916,40 @@ where transactions_to_propagate(&self.pool) } - fn import(&self, transaction: &::Extrinsic) -> Option { + fn hash_of(&self, transaction: &B::Extrinsic) -> H { + self.pool.hash_of(transaction) + } + + fn import(&self, report_handle: ReportHandle, who: PeerId, reputation_change: i32, transaction: B::Extrinsic) { if !self.imports_external_transactions { debug!("Transaction rejected"); - return None; + return; } let encoded = transaction.encode(); match Decode::decode(&mut &encoded[..]) { Ok(uxt) => { let best_block_id = BlockId::hash(self.client.info().chain.best_hash); - match self.pool.submit_one(&best_block_id, uxt) { - Ok(hash) => Some(hash), - Err(e) => match e.into_pool_error() { - Ok(txpool::error::Error::AlreadyImported(hash)) => { - hash.downcast::().ok() - .map(|x| x.as_ref().clone()) - }, - Ok(e) => { - debug!("Error adding transaction to the pool: {:?}", e); - None - }, - Err(e) => { - debug!("Error converting pool error: {:?}", e); - None - }, - } + let import_future = self.pool.submit_one(&best_block_id, uxt); + let import_future = import_future + .then(move |import_result| { + match import_result { + Ok(_) => report_handle.report_peer(who, reputation_change), + Err(e) => match e.into_pool_error() { + Ok(txpool::error::Error::AlreadyImported(_)) => (), + Ok(e) => debug!("Error adding transaction to the pool: {:?}", e), + Err(e) => debug!("Error converting pool error: {:?}", e), + } + } + ready(Ok(())) + }) + .compat(); + + if let Err(e) = self.executor.execute(Box::new(import_future)) { + warn!("Error scheduling extrinsic import: {:?}", e); } } - Err(e) => { - debug!("Error decoding transaction {}", e); - None - } + Err(e) => debug!("Error decoding transaction {}", e), } } @@ -950,6 +961,7 @@ where #[cfg(test)] mod tests { use super::*; + use futures03::executor::block_on; use consensus_common::SelectChain; use sr_primitives::traits::BlindCheckable; use substrate_test_runtime_client::{prelude::*, runtime::{Extrinsic, Transfer}}; @@ -961,7 +973,7 @@ mod tests { let client = Arc::new(client); let pool = Arc::new(TransactionPool::new( Default::default(), - transaction_pool::ChainApi::new(client.clone()) + transaction_pool::FullChainApi::new(client.clone()) )); let best = longest_chain.best_chain().unwrap(); let transaction = Transfer { @@ -970,8 +982,8 @@ mod tests { from: AccountKeyring::Alice.into(), to: Default::default(), }.into_signed_tx(); - pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); - pool.submit_one(&BlockId::hash(best.hash()), Extrinsic::IncludeData(vec![1])).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); + block_on(pool.submit_one(&BlockId::hash(best.hash()), Extrinsic::IncludeData(vec![1]))).unwrap(); assert_eq!(pool.status().ready, 2); // when diff --git a/core/service/test/Cargo.toml b/core/service/test/Cargo.toml index aa3dddfc1851e..459e079c85e11 100644 --- a/core/service/test/Cargo.toml +++ b/core/service/test/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" tempdir = "0.3" tokio = "0.1.7" futures = "0.1" +futures03 = { package = "futures-preview", version = "=0.3.0-alpha.18", features = ["compat"] } log = "0.4" env_logger = "0.6" fdlimit = "0.1" diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 0540210b40a0b..2d064f965bb27 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -445,7 +445,7 @@ pub fn sync( let first_user_data = &network.full_nodes[0].2; let best_block = BlockId::number(first_service.get().client().info().chain.best_number); let extrinsic = extrinsic_factory(&first_service.get(), first_user_data); - first_service.get().transaction_pool().submit_one(&best_block, extrinsic).unwrap(); + futures03::executor::block_on(first_service.get().transaction_pool().submit_one(&best_block, extrinsic)).unwrap(); network.run_until_all_full( |_index, service| service.get().transaction_pool().ready().count() == 1, |_index, _service| true, diff --git a/core/transaction-pool/Cargo.toml b/core/transaction-pool/Cargo.toml index 5e9973b6dc190..e55070f06e2d4 100644 --- a/core/transaction-pool/Cargo.toml +++ b/core/transaction-pool/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] derive_more = "0.14.0" +futures-preview = "=0.3.0-alpha.18" log = "0.4" codec = { package = "parity-scale-codec", version = "1.0.0" } parking_lot = "0.9.0" diff --git a/core/transaction-pool/graph/src/lib.rs b/core/transaction-pool/graph/src/lib.rs index ea890a5cd0f21..715e60874be95 100644 --- a/core/transaction-pool/graph/src/lib.rs +++ b/core/transaction-pool/graph/src/lib.rs @@ -29,6 +29,7 @@ mod listener; mod pool; mod ready; mod rotator; +mod validated_pool; pub mod base_pool; pub mod error; @@ -36,4 +37,8 @@ pub mod watcher; pub use self::error::IntoPoolError; pub use self::base_pool::{Transaction, Status}; -pub use self::pool::{Pool, Options, ChainApi, EventStream, ExtrinsicFor, BlockHash, ExHash, NumberFor, TransactionFor}; +pub use self::pool::{ + Pool, + Options, ChainApi, EventStream, ExtrinsicFor, + BlockHash, ExHash, NumberFor, TransactionFor, +}; diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 389892101ee42..53b2a62cbee89 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -15,29 +15,27 @@ // along with Substrate. If not, see . use std::{ - collections::{HashSet, HashMap}, hash, + collections::HashMap, sync::Arc, - time, }; use crate::base_pool as base; use crate::error; -use crate::listener::Listener; -use crate::rotator::PoolRotator; use crate::watcher::Watcher; use serde::Serialize; -use log::debug; -use futures::channel::mpsc; -use parking_lot::{Mutex, RwLock}; +use futures::{ + Future, FutureExt, + channel::mpsc, + future::{Either, ready, join_all}, +}; use sr_primitives::{ generic::BlockId, traits::{self, SaturatedConversion}, transaction_validity::{TransactionValidity, TransactionTag as Tag, TransactionValidityError}, }; - -pub use crate::base_pool::Limit; +use crate::validated_pool::{ValidatedPool, ValidatedTransaction}; /// Modification notification event stream type; pub type EventStream = mpsc::UnboundedReceiver<()>; @@ -52,6 +50,12 @@ pub type ExtrinsicFor = <::Block as traits::Block>::Extrinsic; pub type NumberFor = traits::NumberFor<::Block>; /// A type of transaction stored in the pool pub type TransactionFor = Arc, ExtrinsicFor>>; +/// A type of validated transaction stored in the pool. +pub type ValidatedTransactionFor = ValidatedTransaction< + ExHash, + ExtrinsicFor, + ::Error, +>; /// Concrete extrinsic validation and query logic. pub trait ChainApi: Send + Sync { @@ -61,9 +65,15 @@ pub trait ChainApi: Send + Sync { type Hash: hash::Hash + Eq + traits::Member + Serialize; /// Error type. type Error: From + error::IntoPoolError; + /// Validate transaction future. + type ValidationFuture: Future> + Send; /// Verify extrinsic at given block. - fn validate_transaction(&self, at: &BlockId, uxt: ExtrinsicFor) -> Result; + fn validate_transaction( + &self, + at: &BlockId, + uxt: ExtrinsicFor, + ) -> Self::ValidationFuture; /// Returns a block number given the block id. fn block_id_to_number(&self, at: &BlockId) -> Result>, Self::Error>; @@ -79,19 +89,19 @@ pub trait ChainApi: Send + Sync { #[derive(Debug, Clone)] pub struct Options { /// Ready queue limits. - pub ready: Limit, + pub ready: base::Limit, /// Future queue limits. - pub future: Limit, + pub future: base::Limit, } impl Default for Options { fn default() -> Self { Options { - ready: Limit { + ready: base::Limit { count: 512, total_bytes: 10 * 1024 * 1024, }, - future: Limit { + future: base::Limit { count: 128, total_bytes: 1 * 1024 * 1024, }, @@ -99,125 +109,60 @@ impl Default for Options { } } -/// Extrinsics pool. +/// Extrinsics pool that performs validation. pub struct Pool { - api: B, - options: Options, - listener: RwLock, BlockHash>>, - pool: RwLock, - ExtrinsicFor, - >>, - import_notification_sinks: Mutex>>, - rotator: PoolRotator>, + validated_pool: Arc>, } impl Pool { + /// Create a new transaction pool. + pub fn new(options: Options, api: B) -> Self { + Pool { + validated_pool: Arc::new(ValidatedPool::new(options, api)), + } + } + /// Imports a bunch of unverified extrinsics to the pool pub fn submit_at(&self, at: &BlockId, xts: T, force: bool) - -> Result, B::Error>>, B::Error> + -> impl Future, B::Error>>, B::Error>> where T: IntoIterator> { - let block_number = self.api.block_id_to_number(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; - - let results = xts - .into_iter() - .map(|xt| -> Result<_, B::Error> { - let (hash, bytes) = self.api.hash_and_length(&xt); - if !force && self.rotator.is_banned(&hash) { - return Err(error::Error::TemporarilyBanned.into()) - } - - match self.api.validate_transaction(at, xt.clone())? { - Ok(validity) => if validity.provides.is_empty() { - Err(error::Error::NoTagsProvided.into()) - } else { - Ok(base::Transaction { - data: xt, - bytes, - hash, - priority: validity.priority, - requires: validity.requires, - provides: validity.provides, - propagate: validity.propagate, - valid_till: block_number - .saturated_into::() - .saturating_add(validity.longevity), - }) - }, - Err(TransactionValidityError::Invalid(e)) => { - Err(error::Error::InvalidTransaction(e).into()) - }, - Err(TransactionValidityError::Unknown(e)) => { - self.listener.write().invalid(&hash); - Err(error::Error::UnknownTransaction(e).into()) - }, - } - }) - .map(|tx| { - let imported = self.pool.write().import(tx?)?; - - if let base::Imported::Ready { .. } = imported { - self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); - } - - let mut listener = self.listener.write(); - fire_events(&mut *listener, &imported); - Ok(imported.hash().clone()) - }) - .collect::>(); - - let removed = self.enforce_limits(); - - Ok(results.into_iter().map(|res| match res { - Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), - other => other, - }).collect()) - } - - fn enforce_limits(&self) -> HashSet> { - let status = self.pool.read().status(); - let ready_limit = &self.options.ready; - let future_limit = &self.options.future; - - debug!(target: "txpool", "Pool Status: {:?}", status); - - if ready_limit.is_exceeded(status.ready, status.ready_bytes) - || future_limit.is_exceeded(status.future, status.future_bytes) { - // clean up the pool - let removed = { - let mut pool = self.pool.write(); - let removed = pool.enforce_limits(ready_limit, future_limit) - .into_iter().map(|x| x.hash.clone()).collect::>(); - // ban all removed transactions - self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); - removed - }; - // run notifications - let mut listener = self.listener.write(); - for h in &removed { - listener.dropped(h, None); - } - - removed - } else { - Default::default() - } + let validated_pool = self.validated_pool.clone(); + self.verify(at, xts, force) + .map(move |validated_transactions| validated_transactions + .map(|validated_transactions| validated_pool.submit(validated_transactions))) } /// Imports one unverified extrinsic to the pool - pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, B::Error> { - Ok(self.submit_at(at, ::std::iter::once(xt), false)?.pop().expect("One extrinsic passed; one result returned; qed")?) + pub fn submit_one( + &self, + at: &BlockId, + xt: ExtrinsicFor, + ) -> impl Future, B::Error>> { + self.submit_at(at, std::iter::once(xt), false) + .map(|import_result| import_result.and_then(|mut import_result| import_result + .pop() + .expect("One extrinsic passed; one result returned; qed") + )) } /// Import a single extrinsic and starts to watch their progress in the pool. - pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, BlockHash>, B::Error> { - let hash = self.api.hash_and_length(&xt).0; - let watcher = self.listener.write().create_watcher(hash); - self.submit_one(at, xt)?; - Ok(watcher) + pub fn submit_and_watch( + &self, + at: &BlockId, + xt: ExtrinsicFor, + ) -> impl Future, BlockHash>, B::Error>> { + let block_number = match self.resolve_block_number(at) { + Ok(block_number) => block_number, + Err(err) => return Either::Left(ready(Err(err))) + }; + + let validated_pool = self.validated_pool.clone(); + Either::Right( + self.verify_one(at, block_number, xt, false) + .map(move |validated_transactions| validated_pool.submit_and_watch(validated_transactions)) + ) } /// Prunes ready transactions. @@ -226,41 +171,46 @@ impl Pool { /// To perform pruning we need the tags that each extrinsic provides and to avoid calling /// into runtime too often we first lookup all extrinsics that are in the pool and get /// their provided tags from there. Otherwise we query the runtime at the `parent` block. - pub fn prune(&self, at: &BlockId, parent: &BlockId, extrinsics: &[ExtrinsicFor]) -> Result<(), B::Error> { - let mut tags = Vec::with_capacity(extrinsics.len()); + pub fn prune( + &self, + at: &BlockId, + parent: &BlockId, + extrinsics: &[ExtrinsicFor], + ) -> impl Future> { // Get details of all extrinsics that are already in the pool - let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::>(); - let in_pool = self.pool.read().by_hash(&hashes); - { - // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option)`) - let all = extrinsics.iter().zip(in_pool.iter()); - - for (extrinsic, existing_in_pool) in all { - match *existing_in_pool { + let (in_pool_hashes, in_pool_tags) = self.validated_pool.extrinsics_tags(extrinsics); + + // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option>)`) + let all = extrinsics.iter().zip(in_pool_tags.into_iter()); + + // Prepare future that collect tags for all extrinsics + let future_tags = join_all(all + .map(|(extrinsic, in_pool_tags)| + match in_pool_tags { // reuse the tags for extrinsics that were found in the pool - Some(ref transaction) => { - tags.extend(transaction.provides.iter().cloned()); - }, + Some(tags) => Either::Left( + ready(tags) + ), // if it's not found in the pool query the runtime at parent block // to get validity info and tags that the extrinsic provides. - None => { - let validity = self.api.validate_transaction(parent, extrinsic.clone()); - match validity { - Ok(Ok(mut validity)) => { - tags.append(&mut validity.provides); - }, + None => Either::Right(self.validated_pool.api().validate_transaction(parent, extrinsic.clone()) + .then(|validity| ready(match validity { + Ok(Ok(validity)) => validity.provides, // silently ignore invalid extrinsics, // cause they might just be inherent - _ => {} - } - }, + _ => Vec::new(), + }))), } - } - } - - self.prune_tags(at, tags, in_pool.into_iter().filter_map(|x| x).map(|x| x.hash.clone()))?; - - Ok(()) + )); + + // Prune transactions by tags + let at = at.clone(); + let self_clone = self.clone(); + future_tags.then(move |tags| self_clone.prune_tags( + &at, + tags.into_iter().flat_map(|tags| tags), + in_pool_hashes, + )) } /// Prunes ready transactions that provide given list of tags. @@ -273,6 +223,9 @@ impl Pool { /// 1. Provide that tag directly /// 2. Are a dependency of pruned transaction. /// + /// Returns transactions that have been removed from the pool and must be reverified + /// before reinserting to the pool. + /// /// By removing predecessor transactions as well we might actually end up /// pruning too much, so all removed transactions are reverified against /// the runtime (`validate_transaction`) to make sure they are invalid. @@ -286,200 +239,183 @@ impl Pool { at: &BlockId, tags: impl IntoIterator, known_imported_hashes: impl IntoIterator> + Clone, - ) -> Result<(), B::Error> { - // Perform tag-based pruning in the base pool - let status = self.pool.write().prune_tags(tags); - // Notify event listeners of all transactions - // that were promoted to `Ready` or were dropped. - { - let mut listener = self.listener.write(); - for promoted in &status.promoted { - fire_events(&mut *listener, promoted); - } - for f in &status.failed { - listener.dropped(f, None); - } - } - // make sure that we don't revalidate extrinsics that were part of the recently + ) -> impl Future> { + // Prune all transactions that provide given tags + let prune_status = match self.validated_pool.prune_tags(tags) { + Ok(prune_status) => prune_status, + Err(e) => return Either::Left(ready(Err(e))), + }; + + // Make sure that we don't revalidate extrinsics that were part of the recently // imported block. This is especially important for UTXO-like chains cause the // inputs are pruned so such transaction would go to future again. - self.rotator.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); + self.validated_pool.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); - // try to re-submit pruned transactions since some of them might be still valid. + // Try to re-validate pruned transactions since some of them might be still valid. // note that `known_imported_hashes` will be rejected here due to temporary ban. - let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); - let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false)?; - - // Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned). - let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { - Err(Ok(error::Error::InvalidTransaction(_))) => Some(hashes[idx].clone()), - _ => None, - }); - // Fire `pruned` notifications for collected hashes and make sure to include - // `known_imported_hashes` since they were just imported as part of the block. - let hashes = hashes.chain(known_imported_hashes.into_iter()); - { - let header_hash = self.api.block_id_to_hash(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; - let mut listener = self.listener.write(); - for h in hashes { - listener.pruned(header_hash, &h); - } - } - // perform regular cleanup of old transactions in the pool - // and update temporary bans. - self.clear_stale(at)?; - Ok(()) - } - - /// Removes stale transactions from the pool. - /// - /// Stale transactions are transaction beyond their longevity period. - /// Note this function does not remove transactions that are already included in the chain. - /// See `prune_tags` if you want this. - pub fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { - let block_number = self.api.block_id_to_number(at)? - .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())? - .saturated_into::(); - let now = time::Instant::now(); - let to_remove = { - self.ready() - .filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx)) - .map(|tx| tx.hash.clone()) - .collect::>() - }; - let futures_to_remove: Vec> = { - let p = self.pool.read(); - let mut hashes = Vec::new(); - for tx in p.futures() { - if self.rotator.ban_if_stale(&now, block_number, &tx) { - hashes.push(tx.hash.clone()); - } - } - hashes - }; - // removing old transactions - self.remove_invalid(&to_remove); - self.remove_invalid(&futures_to_remove); - // clear banned transactions timeouts - self.rotator.clear_timeouts(&now); - - Ok(()) - } - - /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { - Pool { - api, - options, - listener: Default::default(), - pool: Default::default(), - import_notification_sinks: Default::default(), - rotator: Default::default(), - } + let pruned_hashes = prune_status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); + let pruned_transactions = prune_status.pruned.into_iter().map(|tx| tx.data.clone()); + let reverify_future = self.verify(at, pruned_transactions, false); + + // And finally - submit reverified transactions back to the pool + let at = at.clone(); + let validated_pool = self.validated_pool.clone(); + Either::Right(reverify_future.then(move |reverified_transactions| + ready(reverified_transactions.and_then(|reverified_transactions| + validated_pool.resubmit_pruned( + &at, + known_imported_hashes, + pruned_hashes, + reverified_transactions, + )) + ))) } /// Return an event stream of transactions imported to the pool. pub fn import_notification_stream(&self) -> EventStream { - let (sink, stream) = mpsc::unbounded(); - self.import_notification_sinks.lock().push(sink); - stream + self.validated_pool.import_notification_stream() } /// Invoked when extrinsics are broadcasted. pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { - let mut listener = self.listener.write(); - for (hash, peers) in propagated.into_iter() { - listener.broadcasted(&hash, peers); - } + self.validated_pool.on_broadcasted(propagated) } /// Remove from the pool. pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { - // temporarily ban invalid transactions - debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); - self.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); - - let invalid = self.pool.write().remove_invalid(hashes); - - let mut listener = self.listener.write(); - for tx in &invalid { - listener.invalid(&tx.hash); - } - - invalid + self.validated_pool.remove_invalid(hashes) } /// Get an iterator for ready transactions ordered by priority pub fn ready(&self) -> impl Iterator> { - self.pool.read().ready() + self.validated_pool.ready() } /// Returns pool status. pub fn status(&self) -> base::Status { - self.pool.read().status() + self.validated_pool.status() } /// Returns transaction hash pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { - self.api.hash_and_length(xt).0 + self.validated_pool.api().hash_and_length(xt).0 + } + + /// Resolves block number by id. + fn resolve_block_number(&self, at: &BlockId) -> Result, B::Error> { + self.validated_pool.api().block_id_to_number(at) + .and_then(|number| number.ok_or_else(|| + error::Error::InvalidBlockId(format!("{:?}", at)).into())) + } + + /// Returns future that validates a bunch of transactions at given block. + fn verify( + &self, + at: &BlockId, + xts: impl IntoIterator>, + force: bool, + ) -> impl Future>, B::Error>> { + // we need a block number to compute tx validity + let block_number = match self.resolve_block_number(at) { + Ok(block_number) => block_number, + Err(err) => return Either::Left(ready(Err(err))), + }; + + // for each xt, prepare a validation future + let validation_futures = xts.into_iter().map(move |xt| + self.verify_one(at, block_number, xt, force) + ); + + // make single validation future that waits all until all extrinsics are validated + Either::Right(join_all(validation_futures).then(|x| ready(Ok(x)))) + } + + /// Returns future that validates single transaction at given block. + fn verify_one( + &self, + block_id: &BlockId, + block_number: NumberFor, + xt: ExtrinsicFor, + force: bool, + ) -> impl Future> { + let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt); + if !force && self.validated_pool.is_banned(&hash) { + return Either::Left(ready(ValidatedTransaction::Invalid(error::Error::TemporarilyBanned.into()))) + } + + Either::Right(self.validated_pool.api().validate_transaction(block_id, xt.clone()) + .then(move |validation_result| ready(match validation_result { + Ok(validity) => match validity { + Ok(validity) => if validity.provides.is_empty() { + ValidatedTransaction::Invalid(error::Error::NoTagsProvided.into()) + } else { + ValidatedTransaction::Valid(base::Transaction { + data: xt, + bytes, + hash, + priority: validity.priority, + requires: validity.requires, + provides: validity.provides, + propagate: validity.propagate, + valid_till: block_number + .saturated_into::() + .saturating_add(validity.longevity), + }) + }, + Err(TransactionValidityError::Invalid(e)) => + ValidatedTransaction::Invalid(error::Error::InvalidTransaction(e).into()), + Err(TransactionValidityError::Unknown(e)) => + ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()), + }, + Err(e) => ValidatedTransaction::Invalid(e), + }))) } } -fn fire_events( - listener: &mut Listener, - imported: &base::Imported, -) where - H: hash::Hash + Eq + traits::Member + Serialize, - H2: Clone, -{ - match *imported { - base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { - listener.ready(hash, None); - for f in failed { - listener.invalid(f); - } - for r in removed { - listener.dropped(&r.hash, Some(hash)); - } - for p in promoted { - listener.ready(p, None); - } - }, - base::Imported::Future { ref hash } => { - listener.future(hash) - }, +impl Clone for Pool { + fn clone(&self) -> Self { + Self { + validated_pool: self.validated_pool.clone(), + } } } #[cfg(test)] mod tests { + use std::{ + collections::HashMap, + time::Instant, + }; + use parking_lot::Mutex; + use futures::executor::block_on; use super::*; use sr_primitives::transaction_validity::{ValidTransaction, InvalidTransaction}; use codec::Encode; use test_runtime::{Block, Extrinsic, Transfer, H256, AccountId}; use assert_matches::assert_matches; + use crate::base_pool::Limit; use crate::watcher; const INVALID_NONCE: u64 = 254; - #[derive(Debug, Default)] + #[derive(Clone, Debug, Default)] struct TestApi { - delay: Mutex>>, + delay: Arc>>>, } impl ChainApi for TestApi { type Block = Block; type Hash = u64; type Error = error::Error; + type ValidationFuture = futures::future::Ready>; /// Verify extrinsic at given block. fn validate_transaction( &self, at: &BlockId, uxt: ExtrinsicFor, - ) -> Result { - let block_number = self.block_id_to_number(at)?.unwrap(); + ) -> Self::ValidationFuture { + let block_number = self.block_id_to_number(at).unwrap().unwrap(); let nonce = uxt.transfer().nonce; // This is used to control the test flow. @@ -492,7 +428,7 @@ mod tests { } } - if nonce < block_number { + futures::future::ready(if nonce < block_number { Ok(InvalidTransaction::Stale.into()) } else { Ok(Ok(ValidTransaction { @@ -502,7 +438,7 @@ mod tests { longevity: 3, propagate: true, })) - } + }) } /// Returns a block number given the block id. @@ -546,12 +482,12 @@ mod tests { let pool = pool(); // when - let hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); // then assert_eq!(pool.ready().map(|v| v.hash).collect::>(), vec![hash]); @@ -569,8 +505,8 @@ mod tests { }); // when - pool.rotator.ban(&time::Instant::now(), vec![pool.hash_of(&uxt)]); - let res = pool.submit_one(&BlockId::Number(0), uxt); + pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]); + let res = block_on(pool.submit_one(&BlockId::Number(0), uxt)); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -586,25 +522,25 @@ mod tests { let stream = pool.import_notification_stream(); // when - let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); - let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + }))).unwrap(); + let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); + }))).unwrap(); // future doesn't count - let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 3, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 2); assert_eq!(pool.status().future, 1); @@ -622,54 +558,54 @@ mod tests { fn should_clear_stale_transactions() { // given let pool = pool(); - let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); - let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + }))).unwrap(); + let hash2 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); - let hash3 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + }))).unwrap(); + let hash3 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 3, - })).unwrap(); + }))).unwrap(); // when - pool.clear_stale(&BlockId::Number(5)).unwrap(); + pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap(); // then assert_eq!(pool.ready().count(), 0); assert_eq!(pool.status().future, 0); assert_eq!(pool.status().ready, 0); // make sure they are temporarily banned as well - assert!(pool.rotator.is_banned(&hash1)); - assert!(pool.rotator.is_banned(&hash2)); - assert!(pool.rotator.is_banned(&hash3)); + assert!(pool.validated_pool.rotator().is_banned(&hash1)); + assert!(pool.validated_pool.rotator().is_banned(&hash2)); + assert!(pool.validated_pool.rotator().is_banned(&hash3)); } #[test] fn should_ban_mined_transactions() { // given let pool = pool(); - let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); // when - pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap(); // then - assert!(pool.rotator.is_banned(&hash1)); + assert!(pool.validated_pool.rotator().is_banned(&hash1)); } #[test] @@ -684,26 +620,26 @@ mod tests { future: limit.clone(), }, TestApi::default()); - let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().future, 1); // when - let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let hash2 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(2)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 10, - })).unwrap(); + }))).unwrap(); // then assert_eq!(pool.status().future, 1); - assert!(pool.rotator.is_banned(&hash1)); - assert!(!pool.rotator.is_banned(&hash2)); + assert!(pool.validated_pool.rotator().is_banned(&hash1)); + assert!(!pool.validated_pool.rotator().is_banned(&hash2)); } #[test] @@ -719,12 +655,12 @@ mod tests { }, TestApi::default()); // when - pool.submit_one(&BlockId::Number(0), uxt(Transfer { + block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap_err(); + }))).unwrap_err(); // then assert_eq!(pool.status().ready, 0); @@ -737,12 +673,12 @@ mod tests { let pool = pool(); // when - let err = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + let err = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: INVALID_NONCE, - })).unwrap_err(); + }))).unwrap_err(); // then assert_eq!(pool.status().ready, 0); @@ -757,17 +693,17 @@ mod tests { fn should_trigger_ready_and_finalized() { // given let pool = pool(); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 1); assert_eq!(pool.status().future, 0); // when - pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -782,17 +718,17 @@ mod tests { fn should_trigger_ready_and_finalized_when_pruning_via_hash() { // given let pool = pool(); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 1); assert_eq!(pool.status().future, 0); // when - pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -807,22 +743,22 @@ mod tests { fn should_trigger_future_and_ready_after_promoted() { // given let pool = pool(); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 1, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 1); // when - pool.submit_one(&BlockId::Number(0), uxt(Transfer { + block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, - })).unwrap(); + }))).unwrap(); assert_eq!(pool.status().ready, 2); // then @@ -841,11 +777,11 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap(); + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap(); assert_eq!(pool.status().ready, 1); // when - pool.remove_invalid(&[*watcher.hash()]); + pool.validated_pool.remove_invalid(&[*watcher.hash()]); // then @@ -865,7 +801,7 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap(); + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap(); assert_eq!(pool.status().ready, 1); // when @@ -899,7 +835,7 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = pool.submit_and_watch(&BlockId::Number(0), xt).unwrap(); + let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), xt)).unwrap(); assert_eq!(pool.status().ready, 1); // when @@ -909,7 +845,7 @@ mod tests { amount: 4, nonce: 1, }); - pool.submit_one(&BlockId::Number(1), xt).unwrap(); + block_on(pool.submit_one(&BlockId::Number(1), xt)).unwrap(); assert_eq!(pool.status().ready, 1); // then @@ -925,7 +861,7 @@ mod tests { let (ready, is_ready) = std::sync::mpsc::sync_channel(0); let (tx, rx) = std::sync::mpsc::sync_channel(1); let mut api = TestApi::default(); - api.delay = Mutex::new(rx.into()); + api.delay = Arc::new(Mutex::new(rx.into())); let pool = Arc::new(Pool::new(Default::default(), api)); // when @@ -939,7 +875,7 @@ mod tests { // This transaction should go to future, since we use `nonce: 1` let pool2 = pool.clone(); std::thread::spawn(move || { - pool2.submit_one(&BlockId::Number(0), xt).unwrap(); + block_on(pool2.submit_one(&BlockId::Number(0), xt)).unwrap(); ready.send(()).unwrap(); }); @@ -953,11 +889,11 @@ mod tests { }); // The tag the above transaction provides (TestApi is using just nonce as u8) let provides = vec![0_u8]; - pool.submit_one(&BlockId::Number(0), xt).unwrap(); + block_on(pool.submit_one(&BlockId::Number(0), xt)).unwrap(); assert_eq!(pool.status().ready, 1); // Now block import happens before the second transaction is able to finish verification. - pool.prune_tags(&BlockId::Number(1), vec![provides], vec![]).unwrap(); + block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); assert_eq!(pool.status().ready, 0); diff --git a/core/transaction-pool/graph/src/validated_pool.rs b/core/transaction-pool/graph/src/validated_pool.rs new file mode 100644 index 0000000000000..9bf1012628645 --- /dev/null +++ b/core/transaction-pool/graph/src/validated_pool.rs @@ -0,0 +1,371 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::{HashSet, HashMap}, + hash, + time, +}; + +use crate::base_pool as base; +use crate::error; +use crate::listener::Listener; +use crate::rotator::PoolRotator; +use crate::watcher::Watcher; +use serde::Serialize; +use log::debug; + +use futures::channel::mpsc; +use parking_lot::{Mutex, RwLock}; +use sr_primitives::{ + generic::BlockId, + traits::{self, SaturatedConversion}, + transaction_validity::TransactionTag as Tag, +}; + +use crate::base_pool::PruneStatus; +use crate::pool::{EventStream, Options, ChainApi, BlockHash, ExHash, ExtrinsicFor, TransactionFor}; + +/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum. +#[derive(Debug)] +pub enum ValidatedTransaction { + /// Transaction that has been validated successfully. + Valid(base::Transaction), + /// Transaction that is invalid. + Invalid(Error), + /// Transaction which validity can't be determined. + /// + /// We're notifying watchers about failure, if 'unknown' transaction is submitted. + Unknown(Hash, Error), +} + +/// A type of validated transaction stored in the pool. +pub type ValidatedTransactionFor = ValidatedTransaction< + ExHash, + ExtrinsicFor, + ::Error, +>; + +/// Pool that deals with validated transactions. +pub(crate) struct ValidatedPool { + api: B, + options: Options, + listener: RwLock, BlockHash>>, + pool: RwLock, + ExtrinsicFor, + >>, + import_notification_sinks: Mutex>>, + rotator: PoolRotator>, +} + +impl ValidatedPool { + /// Create a new transaction pool. + pub fn new(options: Options, api: B) -> Self { + ValidatedPool { + api, + options, + listener: Default::default(), + pool: Default::default(), + import_notification_sinks: Default::default(), + rotator: Default::default(), + } + } + + /// Bans given set of hashes. + pub fn ban(&self, now: &std::time::Instant, hashes: impl IntoIterator>) { + self.rotator.ban(now, hashes) + } + + /// Returns true if transaction with given hash is currently banned from the pool. + pub fn is_banned(&self, hash: &ExHash) -> bool { + self.rotator.is_banned(hash) + } + + /// Imports a bunch of pre-validated transactions to the pool. + pub fn submit(&self, txs: T) -> Vec, B::Error>> where + T: IntoIterator> + { + let results = txs.into_iter() + .map(|validated_tx| self.submit_one(validated_tx)) + .collect::>(); + + let removed = self.enforce_limits(); + + results.into_iter().map(|res| match res { + Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()), + other => other, + }).collect() + } + + /// Submit single pre-validated transaction to the pool. + fn submit_one(&self, tx: ValidatedTransactionFor) -> Result, B::Error> { + match tx { + ValidatedTransaction::Valid(tx) => { + let imported = self.pool.write().import(tx)?; + + if let base::Imported::Ready { .. } = imported { + self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); + } + + let mut listener = self.listener.write(); + fire_events(&mut *listener, &imported); + Ok(imported.hash().clone()) + } + ValidatedTransaction::Invalid(err) => Err(err.into()), + ValidatedTransaction::Unknown(hash, err) => { + self.listener.write().invalid(&hash); + Err(err.into()) + } + } + } + + fn enforce_limits(&self) -> HashSet> { + let status = self.pool.read().status(); + let ready_limit = &self.options.ready; + let future_limit = &self.options.future; + + debug!(target: "txpool", "Pool Status: {:?}", status); + + if ready_limit.is_exceeded(status.ready, status.ready_bytes) + || future_limit.is_exceeded(status.future, status.future_bytes) { + // clean up the pool + let removed = { + let mut pool = self.pool.write(); + let removed = pool.enforce_limits(ready_limit, future_limit) + .into_iter().map(|x| x.hash.clone()).collect::>(); + // ban all removed transactions + self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone())); + removed + }; + // run notifications + let mut listener = self.listener.write(); + for h in &removed { + listener.dropped(h, None); + } + + removed + } else { + Default::default() + } + } + + /// Import a single extrinsic and starts to watch their progress in the pool. + pub fn submit_and_watch( + &self, + tx: ValidatedTransactionFor, + ) -> Result, BlockHash>, B::Error> { + match tx { + ValidatedTransaction::Valid(tx) => { + let hash = self.api.hash_and_length(&tx.data).0; + let watcher = self.listener.write().create_watcher(hash); + self.submit(std::iter::once(ValidatedTransaction::Valid(tx))) + .pop() + .expect("One extrinsic passed; one result returned; qed") + .map(|_| watcher) + }, + ValidatedTransaction::Invalid(err) => Err(err.into()), + ValidatedTransaction::Unknown(_, err) => Err(err.into()), + } + } + + /// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown). + pub fn extrinsics_tags(&self, extrinsics: &[ExtrinsicFor]) -> (Vec>, Vec>>) { + let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::>(); + let in_pool = self.pool.read().by_hash(&hashes); + ( + hashes, + in_pool.into_iter() + .map(|existing_in_pool| existing_in_pool + .map(|transaction| transaction.provides.iter().cloned() + .collect())) + .collect(), + ) + } + + /// Prunes ready transactions that provide given list of tags. + pub fn prune_tags( + &self, + tags: impl IntoIterator, + ) -> Result, ExtrinsicFor>, B::Error> { + // Perform tag-based pruning in the base pool + let status = self.pool.write().prune_tags(tags); + // Notify event listeners of all transactions + // that were promoted to `Ready` or were dropped. + { + let mut listener = self.listener.write(); + for promoted in &status.promoted { + fire_events(&mut *listener, promoted); + } + for f in &status.failed { + listener.dropped(f, None); + } + } + + Ok(status) + } + + /// Resubmit transactions that have been revalidated after prune_tags call. + pub fn resubmit_pruned( + &self, + at: &BlockId, + known_imported_hashes: impl IntoIterator> + Clone, + pruned_hashes: Vec>, + pruned_xts: Vec>, + ) -> Result<(), B::Error> { + debug_assert_eq!(pruned_hashes.len(), pruned_xts.len()); + + // Resubmit pruned transactions + let results = self.submit(pruned_xts); + + // Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned). + let hashes = results + .into_iter() + .enumerate() + .filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { + Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx].clone()), + _ => None, + }); + // Fire `pruned` notifications for collected hashes and make sure to include + // `known_imported_hashes` since they were just imported as part of the block. + let hashes = hashes.chain(known_imported_hashes.into_iter()); + { + let header_hash = self.api.block_id_to_hash(at)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; + let mut listener = self.listener.write(); + for h in hashes { + listener.pruned(header_hash, &h); + } + } + // perform regular cleanup of old transactions in the pool + // and update temporary bans. + self.clear_stale(at)?; + Ok(()) + } + + /// Removes stale transactions from the pool. + /// + /// Stale transactions are transaction beyond their longevity period. + /// Note this function does not remove transactions that are already included in the chain. + /// See `prune_tags` if you want this. + pub fn clear_stale(&self, at: &BlockId) -> Result<(), B::Error> { + let block_number = self.api.block_id_to_number(at)? + .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())? + .saturated_into::(); + let now = time::Instant::now(); + let to_remove = { + self.ready() + .filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx)) + .map(|tx| tx.hash.clone()) + .collect::>() + }; + let futures_to_remove: Vec> = { + let p = self.pool.read(); + let mut hashes = Vec::new(); + for tx in p.futures() { + if self.rotator.ban_if_stale(&now, block_number, &tx) { + hashes.push(tx.hash.clone()); + } + } + hashes + }; + // removing old transactions + self.remove_invalid(&to_remove); + self.remove_invalid(&futures_to_remove); + // clear banned transactions timeouts + self.rotator.clear_timeouts(&now); + + Ok(()) + } + + /// Get rotator reference. + #[cfg(test)] + pub fn rotator(&self) -> &PoolRotator> { + &self.rotator + } + + /// Get api reference. + pub fn api(&self) -> &B { + &self.api + } + + /// Return an event stream of transactions imported to the pool. + pub fn import_notification_stream(&self) -> EventStream { + let (sink, stream) = mpsc::unbounded(); + self.import_notification_sinks.lock().push(sink); + stream + } + + /// Invoked when extrinsics are broadcasted. + pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { + let mut listener = self.listener.write(); + for (hash, peers) in propagated.into_iter() { + listener.broadcasted(&hash, peers); + } + } + + /// Remove from the pool. + pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + // temporarily ban invalid transactions + debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); + self.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); + + let invalid = self.pool.write().remove_invalid(hashes); + + let mut listener = self.listener.write(); + for tx in &invalid { + listener.invalid(&tx.hash); + } + + invalid + } + + /// Get an iterator for ready transactions ordered by priority + pub fn ready(&self) -> impl Iterator> { + self.pool.read().ready() + } + + /// Returns pool status. + pub fn status(&self) -> base::Status { + self.pool.read().status() + } +} + +fn fire_events( + listener: &mut Listener, + imported: &base::Imported, +) where + H: hash::Hash + Eq + traits::Member + Serialize, + H2: Clone, +{ + match *imported { + base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { + listener.ready(hash, None); + for f in failed { + listener.invalid(f); + } + for r in removed { + listener.dropped(&r.hash, Some(hash)); + } + for p in promoted { + listener.ready(p, None); + } + }, + base::Imported::Future { ref hash } => { + listener.future(hash) + }, + } +} diff --git a/core/transaction-pool/src/api.rs b/core/transaction-pool/src/api.rs index c0c4c787a5012..96403bd3f876f 100644 --- a/core/transaction-pool/src/api.rs +++ b/core/transaction-pool/src/api.rs @@ -37,24 +37,24 @@ use sr_primitives::{ use crate::error; /// The transaction pool logic -pub struct ChainApi { +pub struct FullChainApi { client: Arc, _marker: PhantomData, } -impl ChainApi where +impl FullChainApi where Block: traits::Block, T: traits::ProvideRuntimeApi + HeaderBackend { /// Create new transaction pool logic. pub fn new(client: Arc) -> Self { - ChainApi { + FullChainApi { client, _marker: Default::default() } } } -impl txpool::ChainApi for ChainApi where +impl txpool::ChainApi for FullChainApi where Block: traits::Block, T: traits::ProvideRuntimeApi + HeaderBackend, T::Api: TaggedTransactionQueue @@ -62,9 +62,14 @@ impl txpool::ChainApi for ChainApi where type Block = Block; type Hash = H256; type Error = error::Error; + type ValidationFuture = futures::future::Ready>; - fn validate_transaction(&self, at: &BlockId, uxt: txpool::ExtrinsicFor) -> error::Result { - Ok(self.client.runtime_api().validate_transaction(at, uxt)?) + fn validate_transaction( + &self, + at: &BlockId, + uxt: txpool::ExtrinsicFor, + ) -> Self::ValidationFuture { + futures::future::ready(self.client.runtime_api().validate_transaction(at, uxt).map_err(Into::into)) } fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { diff --git a/core/transaction-pool/src/lib.rs b/core/transaction-pool/src/lib.rs index 1899c601b2fdb..6938166299d85 100644 --- a/core/transaction-pool/src/lib.rs +++ b/core/transaction-pool/src/lib.rs @@ -25,5 +25,5 @@ mod tests; pub mod error; -pub use api::ChainApi; +pub use api::FullChainApi; pub use txpool; diff --git a/core/transaction-pool/src/tests.rs b/core/transaction-pool/src/tests.rs index 1661b7108b9f8..d1ad27dd260f0 100644 --- a/core/transaction-pool/src/tests.rs +++ b/core/transaction-pool/src/tests.rs @@ -18,6 +18,7 @@ use super::*; use codec::Encode; +use futures::executor::block_on; use txpool::{self, Pool}; use test_client::{runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}}; use sr_primitives::{ @@ -38,12 +39,13 @@ impl txpool::ChainApi for TestApi { type Block = Block; type Hash = Hash; type Error = error::Error; + type ValidationFuture = futures::future::Ready>; fn validate_transaction( &self, at: &BlockId, uxt: txpool::ExtrinsicFor, - ) -> error::Result { + ) -> Self::ValidationFuture { let expected = index(at); let requires = if expected == uxt.transfer().nonce { vec![] @@ -52,7 +54,7 @@ impl txpool::ChainApi for TestApi { }; let provides = vec![vec![uxt.transfer().nonce as u8]]; - Ok( + futures::future::ready(Ok( Ok(ValidTransaction { priority: 1, requires, @@ -60,7 +62,7 @@ impl txpool::ChainApi for TestApi { longevity: 64, propagate: true, }) - ) + )) } fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { @@ -111,7 +113,7 @@ fn pool() -> Pool { fn submission_should_work() { let pool = pool(); assert_eq!(209, index(&BlockId::number(0))); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209]); @@ -120,8 +122,8 @@ fn submission_should_work() { #[test] fn multiple_submission_should_work() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); @@ -130,7 +132,7 @@ fn multiple_submission_should_work() { #[test] fn early_nonce_should_be_culled() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::::new()); @@ -140,11 +142,11 @@ fn early_nonce_should_be_culled() { fn late_nonce_should_be_queued() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::::new()); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); } @@ -152,13 +154,13 @@ fn late_nonce_should_be_queued() { #[test] fn prune_tags_should_work() { let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); - pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![]).unwrap(); + block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![210]); @@ -168,14 +170,14 @@ fn prune_tags_should_work() { fn should_ban_invalid_transactions() { let pool = pool(); let uxt = uxt(Alice, 209); - let hash = pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap(); + let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); pool.remove_invalid(&[hash]); - pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); // when let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::::new()); // then - pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); + block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); } diff --git a/node-template/src/service.rs b/node-template/src/service.rs index 843ccb7ee3094..24b22082c5de7 100644 --- a/node-template/src/service.rs +++ b/node-template/src/service.rs @@ -42,7 +42,7 @@ macro_rules! new_full_start { Ok(substrate_client::LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::ChainApi::new(client))) + Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::FullChainApi::new(client))) )? .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { let select_chain = select_chain.take() @@ -187,7 +187,7 @@ pub fn new_light(config: Configuration(config: NodeConfiguration) LongestChain, NetworkStatus, NetworkService::Hash>, - TransactionPool>, + TransactionPool>, OffchainWorkers< ConcreteClient, >::OffchainStorage, @@ -275,7 +274,7 @@ pub fn new_light(config: NodeConfiguration) Ok(LongestChain::new(backend.clone())) })? .with_transaction_pool(|config, client| - Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) + Ok(TransactionPool::new(config, transaction_pool::FullChainApi::new(client))) )? .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { let fetch_checker = fetcher diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index 1bf7bae384ff8..c2cee10b3a099 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -24,3 +24,4 @@ transaction_pool = { package = "substrate-transaction-pool", path = "../../core/ node-testing = { path = "../testing" } node-runtime = { path = "../runtime" } env_logger = "0.6" +futures03 = { package = "futures-preview", version = "=0.3.0-alpha.18" } diff --git a/node/rpc/src/accounts.rs b/node/rpc/src/accounts.rs index 9586b8b3893b9..6c8e60736ac53 100644 --- a/node/rpc/src/accounts.rs +++ b/node/rpc/src/accounts.rs @@ -111,6 +111,7 @@ where mod tests { use super::*; + use futures03::executor::block_on; use node_runtime::{CheckedExtrinsic, Call, TimestampCall}; use codec::Decode; use node_testing::{ @@ -125,7 +126,7 @@ mod tests { // given let _ = env_logger::try_init(); let client = Arc::new(TestClientBuilder::new().build()); - let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))); + let pool = Arc::new(Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone()))); let new_transaction = |extra| { let ex = CheckedExtrinsic { @@ -139,9 +140,9 @@ mod tests { }; // Populate the pool let ext0 = new_transaction(signed_extra(0, 0)); - pool.submit_one(&BlockId::number(0), ext0).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), ext0)).unwrap(); let ext1 = new_transaction(signed_extra(1, 0)); - pool.submit_one(&BlockId::number(0), ext1).unwrap(); + block_on(pool.submit_one(&BlockId::number(0), ext1)).unwrap(); let accounts = Accounts::new(client, pool);