Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Prepare for asynchronous transaction validation in tx pool #3650

Merged
merged 14 commits into from
Oct 1, 2019
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,12 @@ mod tests {
fn should_cease_building_block_when_deadline_is_reached() {
// given
let client = Arc::new(test_client::new());
let chain_api = transaction_pool::ChainApi::new(client.clone());
let chain_api = transaction_pool::FullChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));

txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap();
futures::executor::block_on(
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap()
);

let mut proposer_factory = ProposerFactory {
client: client.clone(),
Expand Down
2 changes: 1 addition & 1 deletion core/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! # use test_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
//! # use transaction_pool::txpool::{self, Pool as TransactionPool};
//! # let client = Arc::new(test_client::new());
//! # let chain_api = transaction_pool::ChainApi::new(client.clone());
//! # let chain_api = transaction_pool::FullChainApi::new(client.clone());
//! # let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory {
Expand Down
13 changes: 13 additions & 0 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
who: PeerId,
extrinsics: message::Transactions<B::Extrinsic>
) {
// sending extrinsic to light node is considered a bad behavior
if !self.config.roles.is_full() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, forgot about this change - could create a separate PR for it, if required. Extrinsics from other nodes are ignored by light tx pool anyway => this change only decreases traffic on light client.

trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who);
self.behaviour.disconnect_peer(&who);
self.peerset_handle.report_peer(who, i32::min_value());
return;
}

// Accept extrinsics only when fully synced
if self.sync.status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
Expand Down Expand Up @@ -985,6 +993,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let extrinsics = self.transaction_pool.transactions();
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
// never send extrinsics to the light node
if !peer.info.roles.is_full() {
continue;
}

let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
Expand Down
19 changes: 9 additions & 10 deletions core/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,29 +302,28 @@ impl<A: ChainApi> AsyncApi<A> {
match msg {
ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext),
}
future::ready(())
});

future::join(extrinsics, http)
.map(|((), ())| ())
}

fn submit_extrinsic(&mut self, ext: Vec<u8>) {
fn submit_extrinsic(&mut self, ext: Vec<u8>) -> impl Future<Output = ()> {
let xt = match <A::Block as traits::Block>::Extrinsic::decode(&mut &*ext) {
Ok(xt) => xt,
Err(e) => {
warn!("Unable to decode extrinsic: {:?}: {}", ext, e.what());
return
return future::Either::Left(future::ready(()))
},
};

info!("Submitting to the pool: {:?} (isSigned: {:?})", xt, xt.is_signed());
match self.transaction_pool.submit_one(&self.at, xt.clone()) {
Ok(hash) => debug!("[{:?}] Offchain transaction added to the pool.", hash),
Err(e) => {
debug!("Couldn't submit transaction: {:?}", e);
},
}
future::Either::Right(self.transaction_pool
.submit_one(&self.at, xt.clone())
.map(|result| match result {
Ok(hash) => { debug!("[{:?}] Offchain transaction added to the pool.", hash); },
Err(e) => { debug!("Couldn't submit transaction: {:?}", e); },
}))
}
}

Expand Down Expand Up @@ -354,7 +353,7 @@ mod tests {
let db = LocalStorage::new_test();
let client = Arc::new(test_client::new());
let pool = Arc::new(
Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))
Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone()))
);

let mock = Arc::new(MockNetworkStateInfo());
Expand Down
2 changes: 1 addition & 1 deletion core/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
// given
let _ = env_logger::try_init();
let client = Arc::new(test_client::new());
let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone())));
let pool = Arc::new(Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone())));
let db = client_db::offchain::LocalStorage::new_test();
let network_state = Arc::new(MockNetworkStateInfo());

Expand Down
3 changes: 3 additions & 0 deletions core/rpc/api/src/author/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use jsonrpc_core as rpc;
/// Author RPC Result type.
pub type Result<T> = std::result::Result<T, Error>;

/// Author RPC future Result type.
pub type FutureResult<T> = Box<dyn rpc::futures::Future<Item = T, Error = Error> + Send>;

/// Author RPC errors.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
Expand Down
4 changes: 2 additions & 2 deletions core/rpc/api/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use primitives::{
Bytes
};
use self::error::Result;
use self::error::{FutureResult, Result};
use txpool::watcher::Status;

pub use self::gen_client::Client as AuthorClient;
Expand All @@ -37,7 +37,7 @@ pub trait AuthorApi<Hash, BlockHash> {

/// Submit hex-encoded extrinsic for inclusion in block.
#[rpc(name = "author_submitExtrinsic")]
fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;
fn submit_extrinsic(&self, extrinsic: Bytes) -> FutureResult<Hash>;

/// Insert a key into the keystore.
#[rpc(name = "author_insertKey")]
Expand Down
64 changes: 44 additions & 20 deletions core/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
mod tests;

use std::{sync::Arc, convert::TryInto};
use futures03::future::{FutureExt, TryFutureExt};
use log::{error, warn};

use client::{self, Client};
use rpc::futures::{Sink, Future};
use rpc::futures::{
Sink, Future,
future::{Executor, result},
};
use futures03::{StreamExt as _, compat::Compat};
use api::Subscriptions;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use codec::{Encode, Decode};
use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr};
use sr_primitives::{generic, traits::{self, ProvideRuntimeApi}};
Expand All @@ -44,10 +48,12 @@ use session::SessionKeys;

/// Re-export the API for backward compatibility.
pub use api::author::*;
use self::error::{Error, Result};
use self::error::{Error, FutureResult, Result};

/// Authoring API
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
/// Futures executor.
executor: Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>,
/// Substrate client
client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,
/// Transactions pool
Expand All @@ -61,12 +67,14 @@ pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
impl<B, E, P, RA> Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
/// Create new instance of Authoring API.
pub fn new(
executor: Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>,
svyatonik marked this conversation as resolved.
Show resolved Hide resolved
client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,
pool: Arc<Pool<P>>,
subscriptions: Subscriptions,
keystore: BareCryptoStorePtr,
) -> Self {
Author {
executor,
client,
pool,
subscriptions,
Expand Down Expand Up @@ -108,15 +116,20 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
).map(Into::into).map_err(|e| Error::Client(Box::new(e)))
}

fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {
let xt = Decode::decode(&mut &ext[..])?;
fn submit_extrinsic(&self, ext: Bytes) -> FutureResult<ExHash<P>> {
let xt = match Decode::decode(&mut &ext[..]) {
Ok(xt) => xt,
Err(err) => return Box::new(result(Err(err.into()))),
};
let best_block_hash = self.client.info().chain.best_hash;
self.pool
Box::new(self.pool
.submit_one(&generic::BlockId::hash(best_block_hash), xt)
.boxed()
svyatonik marked this conversation as resolved.
Show resolved Hide resolved
.compat()
.map_err(|e| e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()))
)
}

fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
Expand Down Expand Up @@ -151,30 +164,41 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
) {
let submit = || -> Result<_> {
let best_block_hash = self.client.info().chain.best_hash;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])?;
self.pool
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
.map_err(|e| error::Error::from(e))?;
svyatonik marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.boxed()
.compat()
.map_err(|e| e.into_pool_error()
.map(Into::into)
.map(|e| error::Error::from(e))
svyatonik marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
))
};

let watcher = match submit() {
Ok(watcher) => watcher,
let future_watcher = match submit() {
Ok(future_watcher) => future_watcher,
Err(err) => {
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
return;
},
};

self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
.map(|_| ())
})
let subscriptions = self.subscriptions.clone();
let subscription_future = future_watcher
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
.map(move |watcher| subscriptions.add(subscriber, move |sink| {
sink
svyatonik marked this conversation as resolved.
Show resolved Hide resolved
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
.map(|_| ())
}));


svyatonik marked this conversation as resolved.
Show resolved Hide resolved
if self.executor.execute(Box::new(subscription_future)).is_err() {
error!("Failed to spawn watch extrinsic task");
}
}

fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
Expand Down
Loading