Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Revalidation tweak & logging for transaction pool #6258

Merged
merged 7 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ impl<A, B, Block, C> Proposer<B, Block, C, A>
Either::Left((iterator, _)) => iterator,
Either::Right(_) => {
log::warn!(
"Timeout fired waiting for transaction pool to be ready. Proceeding to block production anyway.",
"Timeout fired waiting for transaction pool at block #{}. Proceeding with production.",
self.parent_number,
);
self.transaction_pool.ready()
}
Expand Down
3 changes: 3 additions & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>

fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
if self.ready_poll.lock().updated_at() >= at {
log::trace!(target: "txpool", "Transaction pool is already processed block #{}", at);
NikVolf marked this conversation as resolved.
Show resolved Hide resolved
let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
return Box::pin(futures::future::ready(iterator));
}
Expand Down Expand Up @@ -456,6 +457,8 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();

log::trace!(target: "txpool", "Pruning transactions: {:?}", hashes);

if let Err(e) = pool.prune_known(&block_id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}
Expand Down
23 changes: 15 additions & 8 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
#[cfg(test)]
pub const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(1);

const BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;

/// Payload from queue to worker.
struct WorkerPayload<Api: ChainApi> {
Expand Down Expand Up @@ -68,13 +68,20 @@ async fn batch_revalidate<Api: ChainApi>(
let mut invalid_hashes = Vec::new();
let mut revalidated = HashMap::new();

for ext_hash in batch {
let ext = match pool.validated_pool().ready_by_hash(&ext_hash) {
Some(ext) => ext,
None => continue,
};
let validation_results = futures::future::join_all(
batch.into_iter().filter_map(|ext_hash| {
pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
let api = api.clone();
async move {
api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone())
.map(|validation_result| (validation_result, ext_hash.clone(), ext)).await
}
Comment on lines +74 to +78
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let api = api.clone();
async move {
api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone())
.map(|validation_result| (validation_result, ext_hash.clone(), ext)).await
}
api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone())
.map(|validation_result| (validation_result, ext_hash.clone(), ext))

Should work as well?

Copy link
Contributor Author

@NikVolf NikVolf Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think I tried, api needs to be clone in every parallel future I think

})
})
).await;

match api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone()).await {
for (validation_result, ext_hash, ext) in validation_results {
match validation_result {
Ok(Err(TransactionValidityError::Invalid(err))) => {
log::debug!(target: "txpool", "[{:?}]: Revalidation: invalid {:?}", ext_hash, err);
invalid_hashes.push(ext_hash);
Expand Down Expand Up @@ -131,7 +138,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {

fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
let mut queued_exts = Vec::new();
let mut left = BACKGROUND_REVALIDATION_BATCH_SIZE;
let mut left = std::cmp::max(MIN_BACKGROUND_REVALIDATION_BATCH_SIZE, self.members.len() / 4);

// Take maximum of count transaction by order
// which they got into the pool
Expand Down