Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add transactions from retracted blocks back to the pool #3562

Merged
merged 3 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
let chain_api = transaction_pool::ChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));

txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)]).unwrap();
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap();

let mut proposer_factory = ProposerFactory {
client: client.clone(),
Expand Down
19 changes: 10 additions & 9 deletions core/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,23 @@ pub type StorageCollection = Vec<(Vec<u8>, Option<Vec<u8>>)>;
/// In memory arrays of storage values for multiple child tries.
pub type ChildStorageCollection = Vec<(Vec<u8>, StorageCollection)>;

pub(crate) struct ImportSummary<Block: BlockT> {
pub(crate) hash: Block::Hash,
pub(crate) origin: BlockOrigin,
pub(crate) header: Block::Header,
pub(crate) is_new_best: bool,
pub(crate) storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
pub(crate) retracted: Vec<Block::Hash>,
}

/// Import operation wrapper
pub struct ClientImportOperation<
Block: BlockT,
H: Hasher<Out=Block::Hash>,
B: Backend<Block, H>,
> {
pub(crate) op: B::BlockImportOperation,
pub(crate) notify_imported: Option<(
Block::Hash,
BlockOrigin,
Block::Header,
bool,
Option<(
StorageCollection,
ChildStorageCollection,
)>)>,
pub(crate) notify_imported: Option<ImportSummary<Block>>,
pub(crate) notify_finalized: Vec<Block::Hash>,
}

Expand Down
49 changes: 30 additions & 19 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{
},
backend::{
self, BlockImportOperation, PrunableStateChangesTrieStorage,
ClientImportOperation, Finalizer,
ClientImportOperation, Finalizer, ImportSummary,
},
blockchain::{
self, Info as ChainInfo, Backend as ChainBackend,
Expand Down Expand Up @@ -199,6 +199,8 @@ pub struct BlockImportNotification<Block: BlockT> {
pub header: Block::Header,
/// Is this the new best block.
pub is_new_best: bool,
/// List of retracted blocks ordered by block number.
pub retracted: Vec<Block::Hash>,
}

/// Summary of a finalized block.
Expand Down Expand Up @@ -968,6 +970,17 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
crate::backend::NewBlockState::Normal
};

let retracted = if is_new_best {
let route_from_best = crate::blockchain::tree_route(
|id| self.header(&id)?.ok_or_else(|| Error::UnknownBlock(format!("{:?}", id))),
BlockId::Hash(info.best_hash),
BlockId::Hash(parent_hash),
)?;
route_from_best.retracted().iter().map(|e| e.hash.clone()).collect()
} else {
Vec::default()
};

trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin);

operation.op.set_block_data(
Expand Down Expand Up @@ -995,7 +1008,14 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
operation.notify_finalized.push(hash);
}

operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes));
operation.notify_imported = Some(ImportSummary {
hash,
origin,
header: import_headers.into_post(),
is_new_best,
storage_changes,
retracted,
})
}

Ok(ImportResult::imported(is_new_best))
Expand Down Expand Up @@ -1167,33 +1187,24 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where

fn notify_imported(
&self,
notify_import: (
Block::Hash, BlockOrigin,
Block::Header,
bool,
Option<(
Vec<(Vec<u8>, Option<Vec<u8>>)>,
Vec<(Vec<u8>, Vec<(Vec<u8>, Option<Vec<u8>>)>)>,
)
>),
notify_import: ImportSummary<Block>,
) -> error::Result<()> {
let (hash, origin, header, is_new_best, storage_changes) = notify_import;

if let Some(storage_changes) = storage_changes {
if let Some(storage_changes) = notify_import.storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(
&hash,
&notify_import.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}

let notification = BlockImportNotification::<Block> {
hash,
origin,
header,
is_new_best,
hash: notify_import.hash,
origin: notify_import.origin,
header: notify_import.header,
is_new_best: notify_import.is_new_best,
retracted: notify_import.retracted,
};

self.import_notification_sinks.lock()
Expand Down
1 change: 1 addition & 0 deletions core/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ mod tests {
origin: BlockOrigin::File,
header,
is_new_best: false,
retracted: vec![],
}).unwrap();
}
}
Expand Down
70 changes: 69 additions & 1 deletion core/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ ServiceBuilder<
rpc_extensions
))
},
|h, c, tx| maintain_transaction_pool(h, c, tx),
|h, c, tx, r| maintain_transaction_pool(h, c, tx, r),
|n, o, p, ns, v| offchain_workers(n, o, p, ns, v),
|c, ssb, si, te, tp, ext, ks| start_rpc(&rpc_builder, c, ssb, si, te, tp, ext, ks),
)
Expand Down Expand Up @@ -885,6 +885,7 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
id: &BlockId<Block>,
client: &Client<Backend, Executor, Block, Api>,
transaction_pool: &TransactionPool<PoolApi>,
retracted: &[Block::Hash],
) -> error::Result<()> where
Block: BlockT<Hash = <Blake2Hasher as primitives::Hasher>::Out>,
Backend: client::backend::Backend<Block, Blake2Hasher>,
Expand All @@ -893,6 +894,16 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
Executor: client::CallExecutor<Block, Blake2Hasher>,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>,
{
// Put transactions from retracted blocks back into the pool.
for r in retracted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it makes sense to change to retracted.into_iter().rev(), so that depending transactions are imported in-order and not moving to/from future/ready queues? I mean if block #5 had a TX1 that provides OUT1 and block #6 had a TX2 that requires OUT1, then if we first import TX2, then it'll be inserted to future queue (because we miss OUT1). And after we import TX1, it'll be moved to ready queue (because TX1 provides OUT1). If we import TX1 first, then both transactions will be inserted in ready queue immediately.

(and hashes in retracted are in reverse order, afaik)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes retracted in notification to be in ascending order, as documented.

if let Some(block) = client.block(&BlockId::hash(*r))? {
let extrinsics = block.block.extrinsics();
if let Err(e) = transaction_pool.submit_at(id, extrinsics.iter().cloned(), true) {
warn!("Error re-submitting transactions: {:?}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be triggered quite often in case of re-orgs. I would either lower this to debug or figure out how to detect transactions that were already included in another chain and print a warning only for other transactions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submit_at pushes a list of transactions. I believe it only fails when there's something fundamentally wrong with the pool. Otherwise it returns a list of individual results that we ignore here.

}
}
}

// Avoid calling into runtime if there is nothing to prune from the pool anyway.
if transaction_pool.status().is_empty() {
return Ok(())
Expand Down Expand Up @@ -968,10 +979,67 @@ mod tests {
&id,
&client,
&pool,
&[]
).unwrap();

// then
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);
}

#[test]
fn should_add_reverted_transactions_to_the_pool() {
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()));
let transaction = Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx();
let best = longest_chain.best_chain().unwrap();

// store the transaction in the pool
pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap();

// import the block
let mut builder = client.new_block(Default::default()).unwrap();
builder.push(transaction.clone()).unwrap();
let block = builder.bake().unwrap();
let block1_hash = block.header().hash();
let id = BlockId::hash(block1_hash.clone());
client.import(BlockOrigin::Own, block).unwrap();

// fire notification - this should clean up the queue
assert_eq!(pool.status().ready, 1);
maintain_transaction_pool(
&id,
&client,
&pool,
&[]
).unwrap();

// then
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);

// import second block
let builder = client.new_block_at(&BlockId::hash(best.hash()), Default::default()).unwrap();
let block = builder.bake().unwrap();
let id = BlockId::hash(block.header().hash());
client.import(BlockOrigin::Own, block).unwrap();

// fire notification - this should add the transaction back to the pool.
maintain_transaction_pool(
&id,
&client,
&pool,
&[block1_hash]
).unwrap();

// then
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
}
}
1 change: 1 addition & 0 deletions core/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ macro_rules! new_impl {
&BlockId::hash(notification.hash),
&*client,
&*txpool,
&notification.retracted,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
}

Expand Down
10 changes: 6 additions & 4 deletions core/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ pub struct Pool<B: ChainApi> {

impl<B: ChainApi> Pool<B> {
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
-> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>
where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
let block_number = self.api.block_id_to_number(at)?
Expand All @@ -124,7 +126,7 @@ impl<B: ChainApi> Pool<B> {
.into_iter()
.map(|xt| -> Result<_, B::Error> {
let (hash, bytes) = self.api.hash_and_length(&xt);
if self.rotator.is_banned(&hash) {
if !force && self.rotator.is_banned(&hash) {
return Err(error::Error::TemporarilyBanned.into())
}

Expand Down Expand Up @@ -207,7 +209,7 @@ impl<B: ChainApi> Pool<B> {

/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
Ok(self.submit_at(at, ::std::iter::once(xt), false)?.pop().expect("One extrinsic passed; one result returned; qed")?)
}

/// Import a single extrinsic and starts to watch their progress in the pool.
Expand Down Expand Up @@ -306,7 +308,7 @@ impl<B: ChainApi> Pool<B> {
// try to re-submit pruned transactions since some of them might be still valid.
// note that `known_imported_hashes` will be rejected here due to temporary ban.
let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?;
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false)?;

// Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned).
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Expand Down