Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: LocalTransactionPool implemented #6104

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions prdoc/pr_6104.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "LocalTransactionPool implemented for fork aware transaction pool"

doc:
- audience: Node Dev
description: |
LocalTransactionPool trait is implemented for fork aware transaction pool.

crates:
- name: sc-transaction-pool
bump: minor
9 changes: 9 additions & 0 deletions substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ impl ChainApi for TestApi {
})))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> sc_transaction_pool_api::error::Result<TransactionValidity> {
unimplemented!();
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
Expand Down
37 changes: 12 additions & 25 deletions substrate/client/transaction-pool/src/common/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ where
.boxed()
}

/// Validates a transaction by calling into the runtime.
///
/// Same as `validate_transaction` but blocks the current thread when performing validation.
fn validate_transaction_blocking(
&self,
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
Expand Down Expand Up @@ -272,28 +284,3 @@ where

result
}

impl<Client, Block> FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ BlockBackend<Block>
+ BlockIdTo<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
/// Validates a transaction by calling into the runtime, same as
/// `validate_transaction` but blocks the current thread when performing
/// validation. Only implemented for `FullChainApi` since we can call into
/// the runtime locally.
pub fn validate_transaction_blocking(
&self,
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}
}
9 changes: 9 additions & 0 deletions substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ impl ChainApi for TestApi {
futures::future::ready(Ok(res))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> error::Result<TransactionValidity> {
unimplemented!();
}

/// Returns a block number given the block id.
fn block_id_to_number(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ where
log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_result = self.mempool.extend_unwatched(source, xts.clone());
let mempool_result = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return future::ready(Ok(mempool_result)).boxed()
Expand Down Expand Up @@ -838,16 +838,16 @@ where
fn submit_local(
&self,
_at: Block::Hash,
_xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
//todo [#5493]
//looks like view_store / view needs non async submit_local method ?.
let e = Err(sc_transaction_pool_api::error::Error::Unactionable.into());
log::warn!(
target: LOG_TARGET,
"LocalTransactionPool::submit_local is not implemented for ForkAwareTxPool, returning error: {e:?}",
);
e
log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
let xt = Arc::from(xt);
let result = self
.mempool
.extend_unwatched(TransactionSource::Local, &[xt.clone()])
.remove(0)?;

self.view_store.submit_local(xt).or_else(|_| Ok(result))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,11 @@ where
pub(super) fn extend_unwatched(
&self,
source: TransactionSource,
xts: Vec<ExtrinsicFor<ChainApi>>,
xts: &[ExtrinsicFor<ChainApi>],
) -> Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>> {
let mut transactions = self.transactions.write();
let result = xts
.into_iter()
.iter()
.map(|xt| {
let hash = self.api.hash_and_length(&xt).0;
self.try_insert(
Expand Down Expand Up @@ -437,7 +437,7 @@ mod tx_mem_pool_tests {

let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
Expand All @@ -455,7 +455,7 @@ mod tx_mem_pool_tests {
let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
xts.push(xts.iter().last().unwrap().clone());

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max - 1).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
Expand All @@ -471,7 +471,7 @@ mod tx_mem_pool_tests {

let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));

let xt = Arc::from(uxt(98));
Expand All @@ -481,7 +481,7 @@ mod tx_mem_pool_tests {
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
let xt = Arc::from(uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, vec![xt]);
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
Expand All @@ -498,15 +498,15 @@ mod tx_mem_pool_tests {
let xt0 = xts.iter().last().unwrap().clone();
let xt1 = xts.iter().next().unwrap().clone();

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));

let result = mempool.push_watched(TransactionSource::External, xt0);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
let mut result = mempool.extend_unwatched(TransactionSource::External, vec![xt1]);
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt1]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
Expand All @@ -521,7 +521,7 @@ mod tx_mem_pool_tests {

let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();

let results = mempool.extend_unwatched(TransactionSource::External, xts0);
let results = mempool.extend_unwatched(TransactionSource::External, &xts0);
assert!(results.iter().all(Result::is_ok));

let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
Expand Down
53 changes: 48 additions & 5 deletions substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::{
LOG_TARGET,
};
use parking_lot::Mutex;
use sc_transaction_pool_api::{PoolStatus, TransactionSource};
use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionSource};
use sp_blockchain::HashAndNumber;
use sp_runtime::{
traits::Block as BlockT, transaction_validity::TransactionValidityError, SaturatedConversion,
generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
SaturatedConversion,
};
use std::{collections::HashMap, sync::Arc, time::Instant};

Expand Down Expand Up @@ -178,6 +179,50 @@ where
self.pool.submit_and_watch(&self.at, source, xt).await
}

/// Synchronously imports single unvalidated extrinsics into the view.
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash);

let validity = self
.pool
.validated_pool()
.api()
.validate_transaction_blocking(
self.at.hash,
TransactionSource::Local,
Arc::from(xt.clone()),
)?
.map_err(|e| {
match e {
TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
}
.into()
})?;

let block_number = self
.pool
.validated_pool()
.api()
.block_id_to_number(&BlockId::hash(self.at.hash))?
.ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?;

let validated = ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash,
TransactionSource::Local,
Arc::from(xt),
length,
validity,
);

self.pool.validated_pool().submit(vec![validated]).remove(0)
}

/// Status of the pool associated with the view.
pub(super) fn status(&self) -> PoolStatus {
self.pool.validated_pool().status()
Expand Down Expand Up @@ -243,9 +288,7 @@ where
let validation_result = (api.validate_transaction(self.at.hash, tx.source, tx.data.clone()).await, tx.hash, tx);
validation_results.push(validation_result);
} else {
{
self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
}
self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
should_break = true;
}
} => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
ReadyIteratorFor, LOG_TARGET,
};
use futures::prelude::*;
use itertools::Itertools;
use parking_lot::RwLock;
use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus, TransactionSource};
use sp_blockchain::TreeRoute;
Expand Down Expand Up @@ -110,6 +111,37 @@ where
HashMap::<_, _>::from_iter(results.into_iter())
}

/// Synchronously imports single unverified extrinsics into every active view.
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
let active_views = self
.active_views
.read()
.iter()
.map(|(_, view)| view.clone())
.collect::<Vec<_>>();

let tx_hash = self.api.hash_and_length(&xt).0;

let result = active_views
.iter()
.map(|view| {
self.dropped_stream_controller
.add_initial_views(std::iter::once(tx_hash), view.at.hash);
view.submit_local(xt.clone())
})
.find_or_first(Result::is_ok);

if let Some(Err(err)) = result {
log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
return Err(err)
};

Ok(tx_hash)
}

/// Import a single extrinsic and starts to watch its progress in the pool.
///
/// The extrinsic is imported to every view, and the individual streams providing the progress
Expand Down Expand Up @@ -155,12 +187,8 @@ where
let maybe_error = futures::future::join_all(submit_and_watch_futures)
.await
.into_iter()
.reduce(|mut r, v| {
if r.is_err() && v.is_ok() {
r = v;
}
r
});
.find_or_first(Result::is_ok);

if let Some(Err(err)) = maybe_error {
log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
return Err((err, Some(external_watcher)));
Expand Down
13 changes: 12 additions & 1 deletion substrate/client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,25 @@ pub trait ChainApi: Send + Sync {
+ Send
+ 'static;

/// Verify extrinsic at given block.
/// Asynchronously verify extrinsic at given block.
fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture;

/// Synchronously verify given extrinsic at given block.
///
/// Validates a transaction by calling into the runtime. Same as `validate_transaction` but
/// blocks the current thread when performing validation.
fn validate_transaction_blocking(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Result<TransactionValidity, Self::Error>;

/// Returns a block number given the block id.
fn block_id_to_number(
&self,
Expand Down
9 changes: 9 additions & 0 deletions substrate/test-utils/runtime/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ impl ChainApi for TestApi {
ready(Ok(Ok(validity)))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> Result<TransactionValidity, Error> {
unimplemented!();
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
Expand Down
Loading