Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix asynchronous transaction rejections. #3817

Merged
merged 3 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/rpc/api/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ impl Subscriptions {
}
}

/// Borrows the internal task executor.
///
/// This can be used to spawn additional tasks on the underyling event loop.
pub fn executor(&self) -> &TaskExecutor {
&self.executor
}

/// Creates new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber sink into a future.
Expand Down
72 changes: 37 additions & 35 deletions core/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use log::warn;
use client::{self, Client};
use rpc::futures::{
Sink, Future,
stream::Stream as _,
future::result,
};
use futures03::{StreamExt as _, compat::Compat};
use futures03::{StreamExt as _, compat::Compat, future::ready};
use api::Subscriptions;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
Expand Down Expand Up @@ -162,42 +161,45 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
let best_block_hash = self.client.info().chain.best_hash;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
.map_err(error::Error::from)?;
Ok(self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.boxed()
.compat()
.map_err(|e| e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
))
Ok(
self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.map(|res| res.map_err(|e|
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use a second map_err here?
We could also just make one map_err that outputs the correct error directly.

))
)
};

let future_watcher = match submit() {
Ok(future_watcher) => future_watcher,
Err(err) => {
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
return;
},
};

// make 'future' watcher be a future with output = stream of watcher events
let future_watcher = future_watcher
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
.map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))));

// convert a 'future' watcher into the stream with single element = stream of watcher events
let watcher_stream = future_watcher.into_stream();

// and now flatten the 'watcher_stream' so that we'll have the stream with watcher events
let watcher_stream = watcher_stream.flatten();
let subscriptions = self.subscriptions.clone();
let future = ready(submit())
.and_then(|res| res)
// convert the watcher into a `Stream`
.map(|res| res.map(|watcher| watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
// now handle the import result,
// start a new subscrition
.map(move |result| match result {
Ok(watcher) => {
subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|_| unimplemented!())
.send_all(Compat::new(watcher))
.map(|_| ())
});
},
Err(err) => {
warn!("Failed to submit extrinsic: {}", err);
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
},
});

self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(watcher_stream)
.map(|_| ())
});
let res = self.subscriptions.executor()
.execute(Box::new(Compat::new(future.map(|_| Ok(())))));
if res.is_err() {
warn!("Error spawning subscription RPC task.");
}
}

fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
Expand Down
26 changes: 26 additions & 0 deletions core/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use test_client::{
self, AccountKeyring, runtime::{Extrinsic, Transfer, SessionKeys}, DefaultTestClientBuilderExt,
TestClientBuilderExt,
};
use rpc::futures::{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The curly braces can be removed.

Stream as _
};
use tokio::runtime;

fn uxt(sender: AccountKeyring, nonce: u64) -> Extrinsic {
Expand Down Expand Up @@ -132,6 +135,29 @@ fn should_watch_extrinsic() {
);
}

#[test]
fn should_return_watch_validation_error() {
bkchr marked this conversation as resolved.
Show resolved Hide resolved
//given
let mut runtime = runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone())));
let keystore = KeyStore::new();
let p = Author {
client,
pool: pool.clone(),
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
keystore: keystore.clone(),
};
let (subscriber, id_rx, _data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let (subscriber, id_rx, _data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test");
let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");


// when
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());

// then
let res = runtime.block_on(id_rx).unwrap();
assert!(res.is_err(), "Expected the transaction to be rejected as invalid.");
}

#[test]
fn should_return_pending_extrinsics() {
let runtime = runtime::Runtime::new().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub trait ChainApi: Send + Sync {
/// Error type.
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send;
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;

/// Verify extrinsic at given block.
fn validate_transaction(
Expand Down