From 6f092db7634733f2fd485be5ff72ffc64e9cc9ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 14 Oct 2019 16:14:57 +0200 Subject: [PATCH 1/3] Fix handling transaction pool errors. --- core/rpc/api/src/subscriptions.rs | 7 +++ core/rpc/src/author/mod.rs | 72 +++++++++++++------------ core/transaction-pool/graph/src/pool.rs | 2 +- 3 files changed, 45 insertions(+), 36 deletions(-) diff --git a/core/rpc/api/src/subscriptions.rs b/core/rpc/api/src/subscriptions.rs index a1e486138fd7c..d5ca74fa60bc7 100644 --- a/core/rpc/api/src/subscriptions.rs +++ b/core/rpc/api/src/subscriptions.rs @@ -69,6 +69,13 @@ impl Subscriptions { } } + /// Borrows the internal task executor. + /// + /// This can be used to spawn additional tasks on the underyling event loop. + pub fn executor(&self) -> &TaskExecutor { + &self.executor + } + /// Creates new subscription for given subscriber. /// /// Second parameter is a function that converts Subscriber sink into a future. diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index 9a978f22f717a..bcf54031753cb 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -26,10 +26,9 @@ use log::warn; use client::{self, Client}; use rpc::futures::{ Sink, Future, - stream::Stream as _, future::result, }; -use futures03::{StreamExt as _, compat::Compat}; +use futures03::{StreamExt as _, compat::Compat, future::ready}; use api::Subscriptions; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use codec::{Encode, Decode}; @@ -162,42 +161,45 @@ impl AuthorApi, BlockHash

> for Author whe let best_block_hash = self.client.info().chain.best_hash; let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]) .map_err(error::Error::from)?; - Ok(self.pool - .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) - .boxed() - .compat() - .map_err(|e| e.into_pool_error() - .map(error::Error::from) - .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) - )) + Ok( + self.pool + .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) + .map(|res| res.map_err(|e| + e.into_pool_error() + .map(error::Error::from) + .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) + )) + ) }; - let future_watcher = match submit() { - Ok(future_watcher) => future_watcher, - Err(err) => { - // reject the subscriber (ignore errors - we don't care if subscriber is no longer there). - let _ = subscriber.reject(err.into()); - return; - }, - }; - - // make 'future' watcher be a future with output = stream of watcher events - let future_watcher = future_watcher - .map_err(|err| { warn!("Failed to submit extrinsic: {}", err); }) - .map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))); - - // convert a 'future' watcher into the stream with single element = stream of watcher events - let watcher_stream = future_watcher.into_stream(); - - // and now flatten the 'watcher_stream' so that we'll have the stream with watcher events - let watcher_stream = watcher_stream.flatten(); + let subscriptions = self.subscriptions.clone(); + let future = ready(submit()) + .and_then(|res| res) + // convert the watcher into a `Stream` + .map(|res| res.map(|watcher| watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))) + // now handle the import result, + // start a new subscrition + .map(move |result| match result { + Ok(watcher) => { + subscriptions.add(subscriber, move |sink| { + sink + .sink_map_err(|_| unimplemented!()) + .send_all(Compat::new(watcher)) + .map(|_| ()) + }); + }, + Err(err) => { + warn!("Failed to submit extrinsic: {}", err); + // reject the subscriber (ignore errors - we don't care if subscriber is no longer there). + let _ = subscriber.reject(err.into()); + }, + }); - self.subscriptions.add(subscriber, move |sink| { - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(watcher_stream) - .map(|_| ()) - }); + let res = self.subscriptions.executor() + .execute(Box::new(Compat::new(future.map(|_| Ok(()))))); + if res.is_err() { + warn!("Error spawning subscription RPC task."); + } } fn unwatch_extrinsic(&self, _metadata: Option, id: SubscriptionId) -> Result { diff --git a/core/transaction-pool/graph/src/pool.rs b/core/transaction-pool/graph/src/pool.rs index 53b2a62cbee89..621aeabda8ee9 100644 --- a/core/transaction-pool/graph/src/pool.rs +++ b/core/transaction-pool/graph/src/pool.rs @@ -66,7 +66,7 @@ pub trait ChainApi: Send + Sync { /// Error type. type Error: From + error::IntoPoolError; /// Validate transaction future. - type ValidationFuture: Future> + Send; + type ValidationFuture: Future> + Send + Unpin; /// Verify extrinsic at given block. fn validate_transaction( From 4cec07205dfbb6f42a754c30c4e7fa85749943c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 14 Oct 2019 16:45:50 +0200 Subject: [PATCH 2/3] Add test. --- core/rpc/src/author/tests.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/core/rpc/src/author/tests.rs b/core/rpc/src/author/tests.rs index 202dab4940c39..2ba76c3c848e4 100644 --- a/core/rpc/src/author/tests.rs +++ b/core/rpc/src/author/tests.rs @@ -31,6 +31,9 @@ use test_client::{ self, AccountKeyring, runtime::{Extrinsic, Transfer, SessionKeys}, DefaultTestClientBuilderExt, TestClientBuilderExt, }; +use rpc::futures::{ + Stream as _ +}; use tokio::runtime; fn uxt(sender: AccountKeyring, nonce: u64) -> Extrinsic { @@ -132,6 +135,29 @@ fn should_watch_extrinsic() { ); } +#[test] +fn should_return_watch_validation_error() { + //given + let mut runtime = runtime::Runtime::new().unwrap(); + let client = Arc::new(test_client::new()); + let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone()))); + let keystore = KeyStore::new(); + let p = Author { + client, + pool: pool.clone(), + subscriptions: Subscriptions::new(Arc::new(runtime.executor())), + keystore: keystore.clone(), + }; + let (subscriber, id_rx, _data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test"); + + // when + p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into()); + + // then + let res = runtime.block_on(id_rx).unwrap(); + assert!(res.is_err(), "Expected the transaction to be rejected as invalid."); +} + #[test] fn should_return_pending_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); From 9c7be5c4795ce5b3232ea66a03bab5108d152fe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 14 Oct 2019 21:22:24 +0200 Subject: [PATCH 3/3] Review suggestions. --- core/rpc/src/author/mod.rs | 9 ++++----- core/rpc/src/author/tests.rs | 14 ++++++-------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/core/rpc/src/author/mod.rs b/core/rpc/src/author/mod.rs index bcf54031753cb..82122dcf3d21f 100644 --- a/core/rpc/src/author/mod.rs +++ b/core/rpc/src/author/mod.rs @@ -164,11 +164,10 @@ impl AuthorApi, BlockHash

> for Author whe Ok( self.pool .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) - .map(|res| res.map_err(|e| - e.into_pool_error() - .map(error::Error::from) - .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) - )) + .map_err(|e| e.into_pool_error() + .map(error::Error::from) + .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) + ) ) }; diff --git a/core/rpc/src/author/tests.rs b/core/rpc/src/author/tests.rs index 2ba76c3c848e4..e8ba4c132a0ea 100644 --- a/core/rpc/src/author/tests.rs +++ b/core/rpc/src/author/tests.rs @@ -19,20 +19,18 @@ use super::*; use std::sync::Arc; use assert_matches::assert_matches; use codec::Encode; -use transaction_pool::{ - txpool::Pool, - FullChainApi, -}; use primitives::{ H256, blake2_256, hexdisplay::HexDisplay, testing::{ED25519, SR25519, KeyStore}, ed25519, crypto::Pair, }; +use rpc::futures::Stream as _; use test_client::{ self, AccountKeyring, runtime::{Extrinsic, Transfer, SessionKeys}, DefaultTestClientBuilderExt, TestClientBuilderExt, }; -use rpc::futures::{ - Stream as _ +use transaction_pool::{ + txpool::Pool, + FullChainApi, }; use tokio::runtime; @@ -105,7 +103,7 @@ fn should_watch_extrinsic() { subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; - let (subscriber, id_rx, data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test"); + let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test"); // when p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into()); @@ -148,7 +146,7 @@ fn should_return_watch_validation_error() { subscriptions: Subscriptions::new(Arc::new(runtime.executor())), keystore: keystore.clone(), }; - let (subscriber, id_rx, _data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test"); + let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test"); // when p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());