Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: rotator cache size now depends on pool's limits #7102

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
32 changes: 32 additions & 0 deletions prdoc/pr_7102.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
title: '`fatxpool`: rotator cache size now depends on pool''s limits'
doc:
- audience: Node Dev
description: |-
# Description

michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
This PR modifies the hard-coded size of extrinsics cache within [`PoolRotator`](https://github.com/paritytech/polkadot-sdk/blob/cdf107de700388a52a17b2fb852c98420c78278e/substrate/client/transaction-pool/src/graph/rotator.rs#L36-L45) to be inline with pool limits.

The problem was, that due to small size (comparing to number of txs in single block) of hard coded size:
https://github.com/paritytech/polkadot-sdk/blob/cdf107de700388a52a17b2fb852c98420c78278e/substrate/client/transaction-pool/src/graph/rotator.rs#L34
excessive number of unnecessary verification were performed in `prune_tags`:
https://github.com/paritytech/polkadot-sdk/blob/cdf107de700388a52a17b2fb852c98420c78278e/substrate/client/transaction-pool/src/graph/pool.rs#L369-L370

This was resulting in quite long durations of `prune_tags` execution time (which was ok for 6s, but becomes noticable for 2s blocks):
```
Pruning at HashAndNumber { number: 83, ... }. Resubmitting transactions: 6142, reverification took: 237.818955ms
Pruning at HashAndNumber { number: 84, ... }. Resubmitting transactions: 5985, reverification took: 222.118218ms
Pruning at HashAndNumber { number: 85, ... }. Resubmitting transactions: 5981, reverification took: 215.546847ms
```

The fix reduces the overhead:
```
Pruning at HashAndNumber { number: 92, ... }. Resubmitting transactions: 6325, reverification took: 14.728354ms
Pruning at HashAndNumber { number: 93, ... }. Resubmitting transactions: 7030, reverification took: 23.973607ms
Pruning at HashAndNumber { number: 94, ... }. Resubmitting transactions: 4465, reverification took: 9.532472ms
```

## Review Notes
I decided to leave the hardocded `EXPECTED_SIZE` for the legacy transaction pool. Removing verification of transactions during re-submission may negatively impact the behavior of the legace (single-state) pool. As in long-term we probably want to deprecate old pool, I did not invest time to assess the impact of rotator change in behavior of the legacy pool.
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
crates:
- name: sc-transaction-pool
bump: minor
12 changes: 10 additions & 2 deletions substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,22 @@ fn benchmark_main(c: &mut Criterion) {
c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
let api = Arc::from(TestApi::new_dependant());
bench_configured(Pool::new(Default::default(), true.into(), api.clone()), 50, api);
bench_configured(
Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()),
50,
api,
);
});
});

c.bench_function("random 100 tx", |b| {
b.iter(|| {
let api = Arc::from(TestApi::default());
bench_configured(Pool::new(Default::default(), true.into(), api.clone()), 100, api);
bench_configured(
Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()),
100,
api,
);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,5 @@ pub(crate) fn uxt(transfer: Transfer) -> Extrinsic {

pub(crate) fn pool() -> (Pool<TestApi>, Arc<TestApi>) {
let api = Arc::new(TestApi::default());
(Pool::new(Default::default(), true.into(), api.clone()), api)
(Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()), api)
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ where
let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
loop {
if let Some(dropped) = ctx.get_pending_dropped_transaction() {
debug!("dropped_watcher: sending out (pending): {dropped:?}");
trace!("dropped_watcher: sending out (pending): {dropped:?}");
return Some((dropped, ctx));
}
tokio::select! {
biased;
Some(event) = next_event(&mut ctx.stream_map) => {
if let Some(dropped) = ctx.handle_event(event.0, event.1) {
debug!("dropped_watcher: sending out: {dropped:?}");
trace!("dropped_watcher: sending out: {dropped:?}");
return Some((dropped, ctx));
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ where
pool_api.clone(),
listener.clone(),
metrics.clone(),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * options.total_count(),
options.ready.total_bytes + options.future.total_bytes,
));

Expand Down
49 changes: 38 additions & 11 deletions substrate/client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl Default for Options {
}
}

impl Options {
/// Total (ready+future) maximal number of transactions in the pool.
pub fn total_count(&self) -> usize {
self.ready.count + self.future.count
}
}

/// Should we check that the transaction is banned
/// in the pool, before we verify it?
#[derive(Copy, Clone)]
Expand All @@ -172,6 +179,21 @@ pub struct Pool<B: ChainApi> {
}

impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool with statically sized rotator.
pub fn new_with_staticly_sized_rotator(
options: Options,
is_validator: IsValidator,
api: Arc<B>,
) -> Self {
Self {
validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
options,
is_validator,
api,
)),
}
}

/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
Expand Down Expand Up @@ -284,6 +306,7 @@ impl<B: ChainApi> Pool<B> {
let mut validated_counter: usize = 0;

let mut future_tags = Vec::new();
let now = Instant::now();
for (extrinsic, in_pool_tags) in all {
match in_pool_tags {
// reuse the tags for extrinsics that were found in the pool
Expand Down Expand Up @@ -319,7 +342,7 @@ impl<B: ChainApi> Pool<B> {
}
}

log::trace!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}");
log::debug!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}, took:{:?}", now.elapsed());

self.prune_tags(at, future_tags, in_pool_hashes).await
}
Expand Down Expand Up @@ -351,6 +374,7 @@ impl<B: ChainApi> Pool<B> {
tags: impl IntoIterator<Item = Tag>,
known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
) {
let now = Instant::now();
log::trace!(target: LOG_TARGET, "Pruning at {:?}", at);
// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(tags);
Expand All @@ -369,9 +393,8 @@ impl<B: ChainApi> Pool<B> {
let reverified_transactions =
self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await;

let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect();

log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}", &at, reverified_transactions.len());
let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
log::debug!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}, reverification took: {:?}", &at, reverified_transactions.len(), now.elapsed());
log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}");

// And finally - submit reverified transactions back to the pool
Expand Down Expand Up @@ -580,7 +603,7 @@ mod tests {
fn should_reject_unactionable_transactions() {
// given
let api = Arc::new(TestApi::default());
let pool = Pool::new(
let pool = Pool::new_with_staticly_sized_rotator(
Default::default(),
// the node does not author blocks
false.into(),
Expand Down Expand Up @@ -767,7 +790,7 @@ mod tests {
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

let hash1 =
block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
Expand Down Expand Up @@ -803,7 +826,7 @@ mod tests {
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

// when
block_on(
Expand Down Expand Up @@ -1036,7 +1059,7 @@ mod tests {
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

let xt = uxt(Transfer {
from: Alice.into(),
Expand Down Expand Up @@ -1074,7 +1097,7 @@ mod tests {
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

// after validation `IncludeData` will have priority set to 9001
// (validate_transaction mock)
Expand Down Expand Up @@ -1106,7 +1129,7 @@ mod tests {
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

let han_of_block0 = api.expect_hash_and_number(0);

Expand Down Expand Up @@ -1151,7 +1174,11 @@ mod tests {
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let api = Arc::new(api);
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
Default::default(),
true.into(),
api.clone(),
));

let han_of_block0 = api.expect_hash_and_number(0);

Expand Down
29 changes: 25 additions & 4 deletions substrate/client/transaction-pool/src/graph/rotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,39 @@ pub struct PoolRotator<Hash> {
ban_time: Duration,
/// Currently banned extrinsics.
banned_until: RwLock<HashMap<Hash, Instant>>,
/// Expected size of the banned extrinsics cache.
expected_size: usize,
}

impl<Hash: Clone> Clone for PoolRotator<Hash> {
fn clone(&self) -> Self {
Self {
ban_time: self.ban_time,
banned_until: RwLock::new(self.banned_until.read().clone()),
expected_size: self.expected_size,
}
}
}

impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
Self { ban_time: Duration::from_secs(60 * 30), banned_until: Default::default() }
Self {
ban_time: Duration::from_secs(60 * 30),
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
banned_until: Default::default(),
expected_size: EXPECTED_SIZE,
}
}
}

impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
/// New rotator instance with specified ban time.
pub fn new(ban_time: Duration) -> Self {
Self { ban_time, banned_until: Default::default() }
Self { ban_time, ..Self::default() }
}

/// New rotator instance with specified ban time and expected cache size.
pub fn new_with_expected_size(ban_time: Duration, expected_size: usize) -> Self {
Self { expected_size, ..Self::new(ban_time) }
}

/// Returns `true` if extrinsic hash is currently banned.
Expand All @@ -69,8 +90,8 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
banned.insert(hash, *now + self.ban_time);
}

if banned.len() > 2 * EXPECTED_SIZE {
while banned.len() > EXPECTED_SIZE {
if banned.len() > 2 * self.expected_size {
while banned.len() > self.expected_size {
if let Some(key) = banned.keys().next().cloned() {
banned.remove(&key);
}
Expand Down
31 changes: 28 additions & 3 deletions substrate/client/transaction-pool/src/graph/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,49 @@ impl<B: ChainApi> Clone for ValidatedPool<B> {
listener: Default::default(),
pool: RwLock::from(self.pool.read().clone()),
import_notification_sinks: Default::default(),
rotator: PoolRotator::default(),
rotator: self.rotator.clone(),
}
}
}

impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool with statically sized rotator.
pub fn new_with_staticly_sized_rotator(
options: Options,
is_validator: IsValidator,
api: Arc<B>,
) -> Self {
let ban_time = options.ban_time;
Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time))
}

/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
let ban_time = options.ban_time;
let total_count = options.total_count();
Self::new_with_rotator(
options,
is_validator,
api,
PoolRotator::new_with_expected_size(ban_time, total_count),
)
}

fn new_with_rotator(
options: Options,
is_validator: IsValidator,
api: Arc<B>,
rotator: PoolRotator<ExtrinsicHash<B>>,
) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
Self {
is_validator,
options,
listener: Default::default(),
api,
pool: RwLock::new(base_pool),
import_notification_sinks: Default::default(),
rotator: PoolRotator::new(ban_time),
rotator,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,11 @@ mod tests {
#[test]
fn revalidation_queue_works() {
let api = Arc::new(TestApi::default());
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
Default::default(),
true.into(),
api.clone(),
));
let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));

let uxt = uxt(Transfer {
Expand Down Expand Up @@ -414,7 +418,11 @@ mod tests {
#[test]
fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
let api = Arc::new(TestApi::default());
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
Default::default(),
true.into(),
api.clone(),
));
let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));

let uxt0 = uxt(Transfer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ where
finalized_hash: Block::Hash,
options: graph::Options,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone()));
let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
options,
true.into(),
pool_api.clone(),
));
let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
pool_api.clone(),
pool.clone(),
Expand Down Expand Up @@ -177,7 +181,11 @@ where
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
) -> Self {
let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
options,
is_validator,
pool_api.clone(),
));
let (revalidation_queue, background_task) = match revalidation_type {
RevalidationType::Light =>
(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
Expand Down
Loading
Loading