From e169b372784fa9a493c377b2c385baa79e18a750 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Fri, 6 Sep 2019 21:30:43 +0200 Subject: [PATCH] Add transactions from retracted blocks back to the pool (#3562) * Add transactions from retracted blocks back to the pool * Line width * Reverse retracted --- core/basic-authorship/src/basic_authorship.rs | 2 +- core/client/src/backend.rs | 19 ++--- core/client/src/client.rs | 49 ++++++++----- core/finality-grandpa/src/until_imported.rs | 1 + core/service/src/builder.rs | 70 ++++++++++++++++++- core/service/src/lib.rs | 1 + core/transaction-pool/graph/src/pool.rs | 10 +-- 7 files changed, 118 insertions(+), 34 deletions(-) diff --git a/core/basic-authorship/src/basic_authorship.rs b/core/basic-authorship/src/basic_authorship.rs index ffc4d766c9bd3..59b12ba1e40b4 100644 --- a/core/basic-authorship/src/basic_authorship.rs +++ b/core/basic-authorship/src/basic_authorship.rs @@ -250,7 +250,7 @@ mod tests { let chain_api = transaction_pool::ChainApi::new(client.clone()); let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api)); - txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)]).unwrap(); + txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap(); let mut proposer_factory = ProposerFactory { client: client.clone(), diff --git a/core/client/src/backend.rs b/core/client/src/backend.rs index 457a1b86eccdf..ad683ebac1e56 100644 --- a/core/client/src/backend.rs +++ b/core/client/src/backend.rs @@ -35,6 +35,15 @@ pub type StorageCollection = Vec<(Vec, Option>)>; /// In memory arrays of storage values for multiple child tries. pub type ChildStorageCollection = Vec<(Vec, StorageCollection)>; +pub(crate) struct ImportSummary { + pub(crate) hash: Block::Hash, + pub(crate) origin: BlockOrigin, + pub(crate) header: Block::Header, + pub(crate) is_new_best: bool, + pub(crate) storage_changes: Option<(StorageCollection, ChildStorageCollection)>, + pub(crate) retracted: Vec, +} + /// Import operation wrapper pub struct ClientImportOperation< Block: BlockT, @@ -42,15 +51,7 @@ pub struct ClientImportOperation< B: Backend, > { pub(crate) op: B::BlockImportOperation, - pub(crate) notify_imported: Option<( - Block::Hash, - BlockOrigin, - Block::Header, - bool, - Option<( - StorageCollection, - ChildStorageCollection, - )>)>, + pub(crate) notify_imported: Option>, pub(crate) notify_finalized: Vec, } diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 50e962c8b505c..90cdff9fa804b 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -62,7 +62,7 @@ use crate::{ }, backend::{ self, BlockImportOperation, PrunableStateChangesTrieStorage, - ClientImportOperation, Finalizer, + ClientImportOperation, Finalizer, ImportSummary, }, blockchain::{ self, Info as ChainInfo, Backend as ChainBackend, @@ -199,6 +199,8 @@ pub struct BlockImportNotification { pub header: Block::Header, /// Is this the new best block. pub is_new_best: bool, + /// List of retracted blocks ordered by block number. + pub retracted: Vec, } /// Summary of a finalized block. @@ -968,6 +970,17 @@ impl Client where crate::backend::NewBlockState::Normal }; + let retracted = if is_new_best { + let route_from_best = crate::blockchain::tree_route( + |id| self.header(&id)?.ok_or_else(|| Error::UnknownBlock(format!("{:?}", id))), + BlockId::Hash(info.best_hash), + BlockId::Hash(parent_hash), + )?; + route_from_best.retracted().iter().rev().map(|e| e.hash.clone()).collect() + } else { + Vec::default() + }; + trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin); operation.op.set_block_data( @@ -995,7 +1008,14 @@ impl Client where operation.notify_finalized.push(hash); } - operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes)); + operation.notify_imported = Some(ImportSummary { + hash, + origin, + header: import_headers.into_post(), + is_new_best, + storage_changes, + retracted, + }) } Ok(ImportResult::imported(is_new_best)) @@ -1167,33 +1187,24 @@ impl Client where fn notify_imported( &self, - notify_import: ( - Block::Hash, BlockOrigin, - Block::Header, - bool, - Option<( - Vec<(Vec, Option>)>, - Vec<(Vec, Vec<(Vec, Option>)>)>, - ) - >), + notify_import: ImportSummary, ) -> error::Result<()> { - let (hash, origin, header, is_new_best, storage_changes) = notify_import; - - if let Some(storage_changes) = storage_changes { + if let Some(storage_changes) = notify_import.storage_changes { // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? self.storage_notifications.lock() .trigger( - &hash, + ¬ify_import.hash, storage_changes.0.into_iter(), storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), ); } let notification = BlockImportNotification:: { - hash, - origin, - header, - is_new_best, + hash: notify_import.hash, + origin: notify_import.origin, + header: notify_import.header, + is_new_best: notify_import.is_new_best, + retracted: notify_import.retracted, }; self.import_notification_sinks.lock() diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index af797c99ab39b..4232bfacdfcbd 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -472,6 +472,7 @@ mod tests { origin: BlockOrigin::File, header, is_new_best: false, + retracted: vec![], }).unwrap(); } } diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index 53cec940d7cc5..1540eeac9cdb6 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -871,7 +871,7 @@ ServiceBuilder< dht_event_tx, )) }, - |h, c, tx| maintain_transaction_pool(h, c, tx), + |h, c, tx, r| maintain_transaction_pool(h, c, tx, r), |n, o, p, ns, v| offchain_workers(n, o, p, ns, v), |c, ssb, si, te, tp, ext, ks| start_rpc(&rpc_builder, c, ssb, si, te, tp, ext, ks), ) @@ -924,6 +924,7 @@ pub(crate) fn maintain_transaction_pool( id: &BlockId, client: &Client, transaction_pool: &TransactionPool, + retracted: &[Block::Hash], ) -> error::Result<()> where Block: BlockT::Out>, Backend: client::backend::Backend, @@ -932,6 +933,16 @@ pub(crate) fn maintain_transaction_pool( Executor: client::CallExecutor, PoolApi: txpool::ChainApi, { + // Put transactions from retracted blocks back into the pool. + for r in retracted { + if let Some(block) = client.block(&BlockId::hash(*r))? { + let extrinsics = block.block.extrinsics(); + if let Err(e) = transaction_pool.submit_at(id, extrinsics.iter().cloned(), true) { + warn!("Error re-submitting transactions: {:?}", e); + } + } + } + // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { return Ok(()) @@ -1007,10 +1018,67 @@ mod tests { &id, &client, &pool, + &[] ).unwrap(); // then assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); } + + #[test] + fn should_add_reverted_transactions_to_the_pool() { + let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); + let client = Arc::new(client); + let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); + let transaction = Transfer { + amount: 5, + nonce: 0, + from: AccountKeyring::Alice.into(), + to: Default::default(), + }.into_signed_tx(); + let best = longest_chain.best_chain().unwrap(); + + // store the transaction in the pool + pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); + + // import the block + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push(transaction.clone()).unwrap(); + let block = builder.bake().unwrap(); + let block1_hash = block.header().hash(); + let id = BlockId::hash(block1_hash.clone()); + client.import(BlockOrigin::Own, block).unwrap(); + + // fire notification - this should clean up the queue + assert_eq!(pool.status().ready, 1); + maintain_transaction_pool( + &id, + &client, + &pool, + &[] + ).unwrap(); + + // then + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 0); + + // import second block + let builder = client.new_block_at(&BlockId::hash(best.hash()), Default::default()).unwrap(); + let block = builder.bake().unwrap(); + let id = BlockId::hash(block.header().hash()); + client.import(BlockOrigin::Own, block).unwrap(); + + // fire notification - this should add the transaction back to the pool. + maintain_transaction_pool( + &id, + &client, + &pool, + &[block1_hash] + ).unwrap(); + + // then + assert_eq!(pool.status().ready, 1); + assert_eq!(pool.status().future, 0); + } } diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 9fc305560f45a..48d5069701dcc 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -238,6 +238,7 @@ macro_rules! new_impl { &BlockId::hash(notification.hash), &*client, &*txpool, + ¬ification.retracted, ).map_err(|e| warn!("Pool error processing new block: {:?}", e))?; } diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 75a4bc4394f40..389892101ee42 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -114,7 +114,9 @@ pub struct Pool { impl Pool { /// Imports a bunch of unverified extrinsics to the pool - pub fn submit_at(&self, at: &BlockId, xts: T) -> Result, B::Error>>, B::Error> where + pub fn submit_at(&self, at: &BlockId, xts: T, force: bool) + -> Result, B::Error>>, B::Error> + where T: IntoIterator> { let block_number = self.api.block_id_to_number(at)? @@ -124,7 +126,7 @@ impl Pool { .into_iter() .map(|xt| -> Result<_, B::Error> { let (hash, bytes) = self.api.hash_and_length(&xt); - if self.rotator.is_banned(&hash) { + if !force && self.rotator.is_banned(&hash) { return Err(error::Error::TemporarilyBanned.into()) } @@ -207,7 +209,7 @@ impl Pool { /// Imports one unverified extrinsic to the pool pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, B::Error> { - Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?) + Ok(self.submit_at(at, ::std::iter::once(xt), false)?.pop().expect("One extrinsic passed; one result returned; qed")?) } /// Import a single extrinsic and starts to watch their progress in the pool. @@ -306,7 +308,7 @@ impl Pool { // try to re-submit pruned transactions since some of them might be still valid. // note that `known_imported_hashes` will be rejected here due to temporary ban. let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); - let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?; + let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false)?; // Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned). let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {