Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Transactionpool: Make ready_at return earlier #8995

Merged
1 commit merged into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,13 @@ mod tests {
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(),
Options::default(),
true.into(),
api(),
None,
RevalidationType::Full,
spawner.clone(),
0,
));
let env = ProposerFactory::new(
spawner.clone(),
Expand Down Expand Up @@ -373,6 +379,7 @@ mod tests {
None,
RevalidationType::Full,
spawner.clone(),
0,
));
let env = ProposerFactory::new(
spawner.clone(),
Expand Down Expand Up @@ -453,6 +460,7 @@ mod tests {
None,
RevalidationType::Full,
spawner.clone(),
0,
));
let env = ProposerFactory::new(
spawner.clone(),
Expand Down
66 changes: 49 additions & 17 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
}

impl<T, Block: BlockT> ReadyPoll<T, Block> {
fn new(best_block_number: NumberFor<Block>) -> Self {
Self {
updated_at: best_block_number,
pollers: Default::default(),
}
}

fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
self.updated_at = number;

Expand Down Expand Up @@ -189,6 +196,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
spawner: impl SpawnNamed,
best_block_number: NumberFor<Block>,
) -> Self {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type {
Expand All @@ -213,7 +221,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
RevalidationType::Full => RevalidationStrategy::Always,
}
)),
ready_poll: Default::default(),
ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
metrics: PrometheusMetrics::new(prometheus),
}
}
Expand Down Expand Up @@ -309,21 +317,29 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
}

fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
let status = self.status();
// If there are no transactions in the pool, it is fine to return early.
//
// There could be transaction being added because of some re-org happening at the relevant
// block, but this is relative unlikely.
if status.ready == 0 && status.future == 0 {
return async { Box::new(std::iter::empty()) as Box<_> }.boxed()
}

if self.ready_poll.lock().updated_at() >= at {
log::trace!(target: "txpool", "Transaction pool already processed block #{}", at);
let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
return Box::pin(futures::future::ready(iterator));
return async move { iterator }.boxed();
}

Box::pin(
self.ready_poll
.lock()
.add(at)
.map(|received| received.unwrap_or_else(|e| {
log::warn!("Error receiving pending set: {:?}", e);
Box::new(vec![].into_iter())
}))
)
self.ready_poll
.lock()
.add(at)
.map(|received| received.unwrap_or_else(|e| {
log::warn!("Error receiving pending set: {:?}", e);
Box::new(std::iter::empty())
}))
.boxed()
}

fn ready(&self) -> ReadyIteratorFor<PoolApi> {
Expand All @@ -334,7 +350,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
impl<Block, Client, Fetcher> LightPool<Block, Client, Fetcher>
where
Block: BlockT,
Client: sp_blockchain::HeaderBackend<Block> + 'static,
Client: sp_blockchain::HeaderBackend<Block> + sc_client_api::UsageProvider<Block> + 'static,
Fetcher: sc_client_api::Fetcher<Block> + 'static,
{
/// Create new basic transaction pool for a light node with the provided api.
Expand All @@ -345,9 +361,15 @@ where
client: Arc<Client>,
fetcher: Arc<Fetcher>,
) -> Self {
let pool_api = Arc::new(LightChainApi::new(client, fetcher));
let pool_api = Arc::new(LightChainApi::new(client.clone(), fetcher));
Self::with_revalidation_type(
options, false.into(), pool_api, prometheus, RevalidationType::Light, spawner,
options,
false.into(),
pool_api,
prometheus,
RevalidationType::Light,
spawner,
client.usage_info().chain.best_number,
)
}
}
Expand All @@ -357,8 +379,12 @@ where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: sc_client_api::ExecutorProvider<Block> + Send + Sync + 'static,
+ sp_runtime::traits::BlockIdTo<Block>
+ sc_client_api::ExecutorProvider<Block>
+ sc_client_api::UsageProvider<Block>
+ Send
+ Sync
+ 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
{
/// Create new basic transaction pool for a full node with the provided api.
Expand All @@ -371,7 +397,13 @@ where
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool = Arc::new(Self::with_revalidation_type(
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
options,
is_validator,
pool_api,
prometheus,
RevalidationType::Full,
spawner,
client.usage_info().chain.best_number,
));

// make transaction pool available for off-chain runtime calls.
Expand Down