From 1e52e106cfcc19f2aab5852ad8b7130ae236a23d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jan 2024 12:04:12 +0200 Subject: [PATCH 01/34] rpc-v2/tx: Add broadcast method Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/api.rs | 14 +++++++++++++- .../rpc-spec-v2/src/transaction/transaction.rs | 10 +++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index 3eb6cb204f24..965abcf3806f 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -19,7 +19,7 @@ //! API trait for transactions. use crate::transaction::event::TransactionEvent; -use jsonrpsee::proc_macros::rpc; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use sp_core::Bytes; #[rpc(client, server)] @@ -28,10 +28,22 @@ pub trait TransactionApi { /// /// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on /// transaction life cycle. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. #[subscription( name = "transaction_unstable_submitAndWatch" => "transaction_unstable_watchEvent", unsubscribe = "transaction_unstable_unwatch", item = TransactionEvent, )] fn submit_and_watch(&self, bytes: Bytes); + + /// Broadcast an extrinsic to the chain. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_continue", blocking)] + fn broadcast(&self, bytes: Bytes) -> RpcResult>; } diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index b2cfa36c9c99..47419d63d6a3 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -29,7 +29,11 @@ use crate::{ }, SubscriptionTaskExecutor, }; -use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink}; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + types::error::ErrorObject, + PendingSubscriptionSink, +}; use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, @@ -132,6 +136,10 @@ where sc_rpc::utils::spawn_subscription_task(&self.executor, fut); } + + fn broadcast(&self, _bytes: Bytes) -> RpcResult> { + Ok(None) + } } /// The transaction's state that needs to be preserved between From e795285dcf86a3c2440d478d23f42ab169255eb8 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jan 2024 14:59:39 +0200 Subject: [PATCH 02/34] rpc-v2/tx: Submit the transaction to pool until finalized or usurped Signed-off-by: Alexandru Vasile --- .../src/transaction/transaction.rs | 65 ++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 47419d63d6a3..6ed06bbcf3ab 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -137,7 +137,70 @@ where sc_rpc::utils::spawn_subscription_task(&self.executor, fut); } - fn broadcast(&self, _bytes: Bytes) -> RpcResult> { + fn broadcast(&self, bytes: Bytes) -> RpcResult> { + let client = self.client.clone(); + let pool = self.pool.clone(); + + let fut = async move { + // There is nothing we could do with an extrinsic of invalid format. + let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { + return + }; + + // Flag to determine if the we should broadcast the transaction again. + let mut is_done = false; + + while !is_done { + let best_block_hash = client.info().best_hash; + let submit = + pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); + + // The transaction was not included to the pool, because it is invalid. + // However an invalid transaction can become valid at a later time. + let Ok(mut stream) = submit.await else { return }; + + while let Some(event) = stream.next().await { + match event { + // The transaction propagation stops when: + // - The transaction was included in a finalized block via + // `TransactionStatus::Finalized`. + TransactionStatus::Finalized(_) | + // - The block in which the transaction was included could not be finalized for + // more than 256 blocks via `TransactionStatus::FinalityTimeout`. This could + // happen when: + // - the finality gadget is lagging behing + // - the finality gadget is not available for the chain + TransactionStatus::FinalityTimeout(_) | + // - The transaction has been replaced by another transaction with identical tags + // (same sender and same account nonce). + TransactionStatus::Usurped(_) => { + is_done = true; + break; + }, + + // Dropped transaction may renter the pool at a later time, when other + // transactions have been finalized and remove from the pool. + TransactionStatus::Dropped | + // An invalid transaction may become valid at a later time. + TransactionStatus::Invalid => { + break; + }, + + // The transaction is still in the pool, the ready or future queue. + TransactionStatus::Ready | TransactionStatus::Future | + // Transaction has been broadcasted as intended. + TransactionStatus::Broadcast(_) | + // Transaction has been included in a block, but the block is not finalized yet. + TransactionStatus::InBlock(_) | + // Transaction has been retracted, but it may be included in a block at a later time. + TransactionStatus::Retracted(_) => (), + } + } + } + }; + + sc_rpc::utils::spawn_subscription_task(&self.executor, fut); + Ok(None) } } From c1e93962d689fab577cadbf40221be7def952346 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jan 2024 15:07:16 +0200 Subject: [PATCH 03/34] rpc-v2/tx: Generate random operation ID Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 + substrate/client/rpc-spec-v2/Cargo.toml | 1 + .../src/transaction/transaction.rs | 43 ++++++++++++++++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d45d620478e..686bc06ba134 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16358,6 +16358,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.12.1", "pretty_assertions", + "rand", "sc-block-builder", "sc-chain-spec", "sc-client-api", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index e665ddd4f6bc..659ba896326d 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -41,6 +41,7 @@ tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = "0.4.17" futures-util = { version = "0.3.30", default-features = false } +rand = "0.8.5" [dev-dependencies] serde_json = "1.0.111" diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 6ed06bbcf3ab..2166c4b988c2 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -34,17 +34,18 @@ use jsonrpsee::{ types::error::ErrorObject, PendingSubscriptionSink, }; +use parking_lot::RwLock; +use rand::{distributions::Alphanumeric, Rng}; +use sc_rpc::utils::pipe_from_stream; use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, }; -use std::sync::Arc; - -use sc_rpc::utils::pipe_from_stream; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; +use std::{collections::HashSet, sync::Arc}; use codec::Decode; use futures::{StreamExt, TryFutureExt}; @@ -57,12 +58,41 @@ pub struct Transaction { pool: Arc, /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, + /// The brodcast operation IDs. + broadcast_ids: Arc>>, } impl Transaction { /// Creates a new [`Transaction`]. pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { - Transaction { client, pool, executor } + Transaction { client, pool, executor, broadcast_ids: Default::default() } + } + + /// Generate and track an unique operation ID for the `transaction_broadcast` RPC method. + pub fn insert_unique_id(&self) -> String { + let generate_operation_id = || { + // The lenght of the operation ID. + const OPERATION_ID_LEN: usize = 16; + + let mut rng = rand::thread_rng(); + (&mut rng) + .sample_iter(Alphanumeric) + .take(OPERATION_ID_LEN) + .map(char::from) + .collect::() + }; + + let mut id = generate_operation_id(); + + let mut broadcast_ids = self.broadcast_ids.write(); + + while broadcast_ids.contains(&id) { + id = generate_operation_id(); + } + + broadcast_ids.insert(id.clone()); + + id } } @@ -141,6 +171,9 @@ where let client = self.client.clone(); let pool = self.pool.clone(); + // The ID is unique and has been inserted to the broadcast ID set. + let id = self.insert_unique_id(); + let fut = async move { // There is nothing we could do with an extrinsic of invalid format. let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { @@ -201,7 +234,7 @@ where sc_rpc::utils::spawn_subscription_task(&self.executor, fut); - Ok(None) + Ok(Some(id)) } } From 1a8ba2597f2a580a5bc9b116934341ebb83a5102 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jan 2024 15:18:52 +0200 Subject: [PATCH 04/34] rpc-v2/tx: Add api for `transaction_unstable_stop` Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/transaction/api.rs | 10 +++++++++- .../client/rpc-spec-v2/src/transaction/transaction.rs | 4 ++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index 965abcf3806f..ebd919019870 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -44,6 +44,14 @@ pub trait TransactionApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_continue", blocking)] + #[method(name = "transaction_unstable_broadcast", blocking)] fn broadcast(&self, bytes: Bytes) -> RpcResult>; + + /// Broadcast an extrinsic to the chain. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "transaction_unstable_stop", blocking)] + fn stop_broadcast(&self, operation_id: String) -> RpcResult<()>; } diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 2166c4b988c2..2bb2b38436d4 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -236,6 +236,10 @@ where Ok(Some(id)) } + + fn stop_broadcast(&self, _operation_id: String) -> RpcResult<()> { + Ok(()) + } } /// The transaction's state that needs to be preserved between From 3df160b877904edb5831b8d12619f8b2b9ceb188 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jan 2024 19:36:43 +0200 Subject: [PATCH 05/34] rpc-v2/tx: Make the brodcasting future abortable Signed-off-by: Alexandru Vasile --- .../src/transaction/transaction.rs | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 2bb2b38436d4..1444ed327655 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -29,6 +29,9 @@ use crate::{ }, SubscriptionTaskExecutor, }; +use codec::Decode; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures_util::stream::AbortHandle; use jsonrpsee::{ core::{async_trait, RpcResult}, types::error::ErrorObject, @@ -45,10 +48,7 @@ use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; -use std::{collections::HashSet, sync::Arc}; - -use codec::Decode; -use futures::{StreamExt, TryFutureExt}; +use std::{collections::HashMap, sync::Arc}; /// An API for transaction RPC calls. pub struct Transaction { @@ -59,7 +59,12 @@ pub struct Transaction { /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, /// The brodcast operation IDs. - broadcast_ids: Arc>>, + broadcast_ids: Arc>>, +} + +struct BroadcastState { + /// Handle to abort the running future that broadcasts the transaction. + handle: AbortHandle, } impl Transaction { @@ -68,8 +73,8 @@ impl Transaction { Transaction { client, pool, executor, broadcast_ids: Default::default() } } - /// Generate and track an unique operation ID for the `transaction_broadcast` RPC method. - pub fn insert_unique_id(&self) -> String { + /// Generate an unique operation ID for the `transaction_broadcast` RPC method. + pub fn generate_unique_id(&self) -> String { let generate_operation_id = || { // The lenght of the operation ID. const OPERATION_ID_LEN: usize = 16; @@ -84,14 +89,12 @@ impl Transaction { let mut id = generate_operation_id(); - let mut broadcast_ids = self.broadcast_ids.write(); + let broadcast_ids = self.broadcast_ids.read(); - while broadcast_ids.contains(&id) { + while broadcast_ids.contains_key(&id) { id = generate_operation_id(); } - broadcast_ids.insert(id.clone()); - id } } @@ -171,10 +174,10 @@ where let client = self.client.clone(); let pool = self.pool.clone(); - // The ID is unique and has been inserted to the broadcast ID set. - let id = self.insert_unique_id(); + // The unique ID of this operation. + let id = self.generate_unique_id(); - let fut = async move { + let broadcast_transaction_fut = async move { // There is nothing we could do with an extrinsic of invalid format. let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { return @@ -232,6 +235,19 @@ where } }; + // Convert the future into an abortable future, for easily terminating it from the + // `transaction_stop` method. + let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); + // The future expected by the executor must be `Future` instead of + // `Future>`. + let fut = fut.map(drop); + + // Keep track of this entry and the abortable handle. + { + let mut broadcast_ids = self.broadcast_ids.write(); + broadcast_ids.insert(id.clone(), BroadcastState { handle }); + } + sc_rpc::utils::spawn_subscription_task(&self.executor, fut); Ok(Some(id)) From badd863b3c84f076691be640c826bcbb0be3b983 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jan 2024 19:41:47 +0200 Subject: [PATCH 06/34] rpc-v2/tx: Stop the brodcasting by aborting the future Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/transaction.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 1444ed327655..6e1eee8901c0 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -253,7 +253,13 @@ where Ok(Some(id)) } - fn stop_broadcast(&self, _operation_id: String) -> RpcResult<()> { + fn stop_broadcast(&self, operation_id: String) -> RpcResult<()> { + let mut broadcast_ids = self.broadcast_ids.write(); + + // TODO: Signal error on wrong operation ID. + let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { return Ok(()) }; + broadcast_state.handle.abort(); + Ok(()) } } From 7f788aa5205165dfc71fb0ac6f4ab35d2e3742cc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 13:40:38 +0200 Subject: [PATCH 07/34] rpc-v2/tx: Use best block stream for broadcasting the tx Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/transaction.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 6e1eee8901c0..4a0bab96dc07 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -37,6 +37,8 @@ use jsonrpsee::{ types::error::ErrorObject, PendingSubscriptionSink, }; +use sc_client_api::BlockchainEvents; + use parking_lot::RwLock; use rand::{distributions::Alphanumeric, Rng}; use sc_rpc::utils::pipe_from_stream; @@ -119,7 +121,12 @@ where Pool: TransactionPool + Sync + Send + 'static, Pool::Hash: Unpin, ::Hash: Unpin, - Client: HeaderBackend + ProvideRuntimeApi + Send + Sync + 'static, + Client: HeaderBackend + + BlockchainEvents + + ProvideRuntimeApi + + Send + + Sync + + 'static, { fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) { let client = self.client.clone(); @@ -171,12 +178,16 @@ where } fn broadcast(&self, bytes: Bytes) -> RpcResult> { - let client = self.client.clone(); let pool = self.pool.clone(); // The unique ID of this operation. let id = self.generate_unique_id(); + let mut best_block_import_stream = + Box::pin(self.client.import_notification_stream().filter_map( + |notification| async move { notification.is_new_best.then_some(notification.hash) }, + )); + let broadcast_transaction_fut = async move { // There is nothing we could do with an extrinsic of invalid format. let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { @@ -187,7 +198,8 @@ where let mut is_done = false; while !is_done { - let best_block_hash = client.info().best_hash; + let Some(best_block_hash) = best_block_import_stream.next().await else { return }; + let submit = pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); From eef27518fd25767cdecdb4d7509f29ab806f24df Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 14:05:32 +0200 Subject: [PATCH 08/34] rpc-v2/tx: Implement `Stream::last` for best block imports Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/transaction.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 4a0bab96dc07..20d81ae9f5e0 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -198,7 +198,21 @@ where let mut is_done = false; while !is_done { - let Some(best_block_hash) = best_block_import_stream.next().await else { return }; + // Wait for the next block to become available. + let Some(mut best_block_hash) = best_block_import_stream.next().await else { + return + }; + // We are effectively polling the stream for the last available item at this time. + // The `now_or_never` returns `None` if the stream is `Pending`. + // + // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. + while let Some(next) = best_block_import_stream.next().now_or_never() { + let Some(next) = next else { + // Nothing to do if the best block stream terminated. + return + }; + best_block_hash = next; + } let submit = pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); From 96a80b0a1392276787aaf4579a06e687ed16259c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 17:50:07 +0200 Subject: [PATCH 09/34] rpc-v2/tx: Simplify tx trait bounds Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/transaction.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 20d81ae9f5e0..2b284b020a2c 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -46,7 +46,6 @@ use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, }; -use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; @@ -121,12 +120,7 @@ where Pool: TransactionPool + Sync + Send + 'static, Pool::Hash: Unpin, ::Hash: Unpin, - Client: HeaderBackend - + BlockchainEvents - + ProvideRuntimeApi - + Send - + Sync - + 'static, + Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, { fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) { let client = self.client.clone(); From 4e72ce7862f7685fc22caa2678c85f019ee2081c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 17:59:46 +0200 Subject: [PATCH 10/34] rpc-v2/chainHead: Make test util crate public Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/chain_head/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index 4cbbd00f64f3..c9fe19aca2b1 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -23,7 +23,7 @@ //! Methods are prefixed by `chainHead`. #[cfg(test)] -mod test_utils; +pub mod test_utils; #[cfg(test)] mod tests; From 70c7d08121aa841a524990c23bef7a32aa6540ba Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 18:00:08 +0200 Subject: [PATCH 11/34] tx/tests: Check brodcasted transaction enters pool Signed-off-by: Alexandru Vasile --- Cargo.lock | 2 + substrate/client/rpc-spec-v2/Cargo.toml | 2 + .../client/rpc-spec-v2/src/transaction/mod.rs | 3 + .../rpc-spec-v2/src/transaction/tests.rs | 97 +++++++++++++++++++ 4 files changed, 104 insertions(+) create mode 100644 substrate/client/rpc-spec-v2/src/transaction/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 686bc06ba134..bea17af9c03f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16364,6 +16364,7 @@ dependencies = [ "sc-client-api", "sc-rpc", "sc-service", + "sc-transaction-pool", "sc-transaction-pool-api", "sc-utils", "serde", @@ -16379,6 +16380,7 @@ dependencies = [ "sp-version", "substrate-test-runtime", "substrate-test-runtime-client", + "substrate-test-runtime-transaction-pool", "thiserror", "tokio", "tokio-stream", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index 659ba896326d..e87229d0b1fa 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -48,6 +48,7 @@ serde_json = "1.0.111" tokio = { version = "1.22.0", features = ["macros"] } substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } substrate-test-runtime = { path = "../../test-utils/runtime" } +substrate-test-runtime-transaction-pool = { path = "../../test-utils/runtime/transaction-pool" } sp-consensus = { path = "../../primitives/consensus/common" } sp-externalities = { path = "../../primitives/externalities" } sp-maybe-compressed-blob = { path = "../../primitives/maybe-compressed-blob" } @@ -55,3 +56,4 @@ sc-block-builder = { path = "../block-builder" } sc-service = { path = "../service", features = ["test-helpers"] } assert_matches = "1.3.0" pretty_assertions = "1.2.1" +sc-transaction-pool = { path = "../transaction-pool" } diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs index 212912ba1c72..2f34a6013460 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs @@ -25,6 +25,9 @@ //! //! Methods are prefixed by `transaction`. +#[cfg(test)] +mod tests; + pub mod api; pub mod error; pub mod event; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs new file mode 100644 index 000000000000..c1b55b7b70a1 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -0,0 +1,97 @@ +use super::*; +use crate::{ + chain_head::test_utils::ChainHeadMockClient, transaction::Transaction as RpcTransaction, +}; +use codec::Encode; +use futures::Future; +use jsonrpsee::rpc_params; +use sc_transaction_pool::*; +use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; +use sp_core::{ + hexdisplay::{AsBytesRef, HexDisplay}, + testing::TaskExecutor, +}; +use std::{pin::Pin, sync::Arc, time::Duration}; +use substrate_test_runtime_client::{prelude::*, AccountKeyring::*}; +use substrate_test_runtime_transaction_pool::{uxt, TestApi}; + +/// Util function to encode a value as a hex string +pub fn hex_string(data: &Data) -> String { + format!("0x{:?}", HexDisplay::from(data)) +} + +type Block = substrate_test_runtime_client::runtime::Block; + +fn create_basic_pool_with_genesis( + test_api: Arc, +) -> (BasicPool, Pin + Send>>) { + let genesis_hash = { + test_api + .chain() + .read() + .block_by_number + .get(&0) + .map(|blocks| blocks[0].0.header.hash()) + .expect("there is block 0. qed") + }; + BasicPool::new_test(test_api, genesis_hash, genesis_hash) +} + +fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { + let api = Arc::new(TestApi::with_alice_nonce(209)); + let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); + + let thread_pool = futures::executor::ThreadPool::new().unwrap(); + thread_pool.spawn_ok(background_task); + (pool, api, thread_pool) +} + +#[tokio::test] +async fn tx_broadcast_enters_pool() { + let builder = TestClientBuilder::new(); + let _backend = builder.backend(); + let client = Arc::new(builder.build()); + + let (pool, api, _) = maintained_pool(); + let pool = Arc::new(pool); + + let uxt = uxt(Alice, 209); + let xt = hex_string(&uxt.encode()); + + let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); + let tx_api = + RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(TaskExecutor::default())) + .into_rpc(); + + // Start at block 1. + let block_1_header = api.push_block(1, vec![], true); + let _block_1 = api.expect_hash_from_number(1); + + let _operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + // Announce block 1 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_1_header).await; + + // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool. + + // TODO: Improve testability by extending the `transaction_unstable_broadcast` with + // a middleware trait that intercepts the transaction status for testing. + let mut num_retries = 12; + while num_retries > 0 && pool.status().ready != 1 { + tokio::time::sleep(Duration::from_secs(5)).await; + num_retries -= 1; + } + assert_eq!(1, pool.status().ready); + assert_eq!(uxt.encode().len(), pool.status().ready_bytes); + + // Import block 2 with the transaction included. + let block_2_header = api.push_block(2, vec![uxt.clone()], true); + let block_2 = block_2_header.hash(); + + // Announce block 2 to the pool. + let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; + pool.maintain(event).await; + + assert_eq!(0, pool.status().ready); +} From 0cd5a8004a5ec0576dbf2bb54de06a93c795900f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 18:08:25 +0200 Subject: [PATCH 12/34] rpc-v2/tx: Split transactionWatch and transactionBroadcast Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/api.rs | 3 + .../client/rpc-spec-v2/src/transaction/mod.rs | 4 +- .../src/transaction/transaction.rs | 163 +------------ .../src/transaction/transaction_broadcast.rs | 214 ++++++++++++++++++ 4 files changed, 225 insertions(+), 159 deletions(-) create mode 100644 substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index ebd919019870..9a583a27060d 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -38,7 +38,10 @@ pub trait TransactionApi { item = TransactionEvent, )] fn submit_and_watch(&self, bytes: Bytes); +} +#[rpc(client, server)] +pub trait TransactionBroadcastApi { /// Broadcast an extrinsic to the chain. /// /// # Unstable diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs index 2f34a6013460..74268a5372a3 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs @@ -32,10 +32,12 @@ pub mod api; pub mod error; pub mod event; pub mod transaction; +pub mod transaction_broadcast; -pub use api::TransactionApiServer; +pub use api::{TransactionApiServer, TransactionBroadcastApiServer}; pub use event::{ TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError, TransactionEvent, }; pub use transaction::Transaction; +pub use transaction_broadcast::TransactionBroadcast; diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index 2b284b020a2c..17889b3bad2a 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -30,17 +30,8 @@ use crate::{ SubscriptionTaskExecutor, }; use codec::Decode; -use futures::{FutureExt, StreamExt, TryFutureExt}; -use futures_util::stream::AbortHandle; -use jsonrpsee::{ - core::{async_trait, RpcResult}, - types::error::ErrorObject, - PendingSubscriptionSink, -}; -use sc_client_api::BlockchainEvents; - -use parking_lot::RwLock; -use rand::{distributions::Alphanumeric, Rng}; +use futures::{StreamExt, TryFutureExt}; +use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink}; use sc_rpc::utils::pipe_from_stream; use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, @@ -49,7 +40,7 @@ use sc_transaction_pool_api::{ use sp_blockchain::HeaderBackend; use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; /// An API for transaction RPC calls. pub struct Transaction { @@ -59,44 +50,12 @@ pub struct Transaction { pool: Arc, /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, - /// The brodcast operation IDs. - broadcast_ids: Arc>>, -} - -struct BroadcastState { - /// Handle to abort the running future that broadcasts the transaction. - handle: AbortHandle, } impl Transaction { /// Creates a new [`Transaction`]. pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { - Transaction { client, pool, executor, broadcast_ids: Default::default() } - } - - /// Generate an unique operation ID for the `transaction_broadcast` RPC method. - pub fn generate_unique_id(&self) -> String { - let generate_operation_id = || { - // The lenght of the operation ID. - const OPERATION_ID_LEN: usize = 16; - - let mut rng = rand::thread_rng(); - (&mut rng) - .sample_iter(Alphanumeric) - .take(OPERATION_ID_LEN) - .map(char::from) - .collect::() - }; - - let mut id = generate_operation_id(); - - let broadcast_ids = self.broadcast_ids.read(); - - while broadcast_ids.contains_key(&id) { - id = generate_operation_id(); - } - - id + Transaction { client, pool, executor } } } @@ -120,7 +79,7 @@ where Pool: TransactionPool + Sync + Send + 'static, Pool::Hash: Unpin, ::Hash: Unpin, - Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, + Client: HeaderBackend + Send + Sync + 'static, { fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) { let client = self.client.clone(); @@ -170,118 +129,6 @@ where sc_rpc::utils::spawn_subscription_task(&self.executor, fut); } - - fn broadcast(&self, bytes: Bytes) -> RpcResult> { - let pool = self.pool.clone(); - - // The unique ID of this operation. - let id = self.generate_unique_id(); - - let mut best_block_import_stream = - Box::pin(self.client.import_notification_stream().filter_map( - |notification| async move { notification.is_new_best.then_some(notification.hash) }, - )); - - let broadcast_transaction_fut = async move { - // There is nothing we could do with an extrinsic of invalid format. - let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { - return - }; - - // Flag to determine if the we should broadcast the transaction again. - let mut is_done = false; - - while !is_done { - // Wait for the next block to become available. - let Some(mut best_block_hash) = best_block_import_stream.next().await else { - return - }; - // We are effectively polling the stream for the last available item at this time. - // The `now_or_never` returns `None` if the stream is `Pending`. - // - // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. - while let Some(next) = best_block_import_stream.next().now_or_never() { - let Some(next) = next else { - // Nothing to do if the best block stream terminated. - return - }; - best_block_hash = next; - } - - let submit = - pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); - - // The transaction was not included to the pool, because it is invalid. - // However an invalid transaction can become valid at a later time. - let Ok(mut stream) = submit.await else { return }; - - while let Some(event) = stream.next().await { - match event { - // The transaction propagation stops when: - // - The transaction was included in a finalized block via - // `TransactionStatus::Finalized`. - TransactionStatus::Finalized(_) | - // - The block in which the transaction was included could not be finalized for - // more than 256 blocks via `TransactionStatus::FinalityTimeout`. This could - // happen when: - // - the finality gadget is lagging behing - // - the finality gadget is not available for the chain - TransactionStatus::FinalityTimeout(_) | - // - The transaction has been replaced by another transaction with identical tags - // (same sender and same account nonce). - TransactionStatus::Usurped(_) => { - is_done = true; - break; - }, - - // Dropped transaction may renter the pool at a later time, when other - // transactions have been finalized and remove from the pool. - TransactionStatus::Dropped | - // An invalid transaction may become valid at a later time. - TransactionStatus::Invalid => { - break; - }, - - // The transaction is still in the pool, the ready or future queue. - TransactionStatus::Ready | TransactionStatus::Future | - // Transaction has been broadcasted as intended. - TransactionStatus::Broadcast(_) | - // Transaction has been included in a block, but the block is not finalized yet. - TransactionStatus::InBlock(_) | - // Transaction has been retracted, but it may be included in a block at a later time. - TransactionStatus::Retracted(_) => (), - } - } - } - }; - - // Convert the future into an abortable future, for easily terminating it from the - // `transaction_stop` method. - let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); - // The future expected by the executor must be `Future` instead of - // `Future>`. - let fut = fut.map(drop); - - // Keep track of this entry and the abortable handle. - { - let mut broadcast_ids = self.broadcast_ids.write(); - broadcast_ids.insert(id.clone(), BroadcastState { handle }); - } - - sc_rpc::utils::spawn_subscription_task(&self.executor, fut); - - Ok(Some(id)) - } - - fn stop_broadcast(&self, operation_id: String) -> RpcResult<()> { - let mut broadcast_ids = self.broadcast_ids.write(); - - // TODO: Signal error on wrong operation ID. - let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { return Ok(()) }; - broadcast_state.handle.abort(); - - Ok(()) - } } /// The transaction's state that needs to be preserved between diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs new file mode 100644 index 000000000000..de2cdb2ac4df --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -0,0 +1,214 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! API implementation for submitting transactions. + +use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor}; +use codec::Decode; +use futures::{FutureExt, StreamExt}; +use futures_util::stream::AbortHandle; +use jsonrpsee::core::{async_trait, RpcResult}; +use parking_lot::RwLock; +use rand::{distributions::Alphanumeric, Rng}; +use sc_client_api::BlockchainEvents; +use sc_transaction_pool_api::{ + BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, +}; +use sp_blockchain::HeaderBackend; +use sp_core::Bytes; +use sp_runtime::traits::Block as BlockT; +use std::{collections::HashMap, sync::Arc}; + +/// An API for transaction RPC calls. +pub struct TransactionBroadcast { + /// Substrate client. + client: Arc, + /// Transactions pool. + pool: Arc, + /// Executor to spawn subscriptions. + executor: SubscriptionTaskExecutor, + /// The brodcast operation IDs. + broadcast_ids: Arc>>, +} + +/// The state of a broadcast operation. +struct BroadcastState { + /// Handle to abort the running future that broadcasts the transaction. + handle: AbortHandle, +} + +impl TransactionBroadcast { + /// Creates a new [`Transaction`]. + pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { + TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() } + } + + /// Generate an unique operation ID for the `transaction_broadcast` RPC method. + pub fn generate_unique_id(&self) -> String { + let generate_operation_id = || { + // The lenght of the operation ID. + const OPERATION_ID_LEN: usize = 16; + + let mut rng = rand::thread_rng(); + (&mut rng) + .sample_iter(Alphanumeric) + .take(OPERATION_ID_LEN) + .map(char::from) + .collect::() + }; + + let mut id = generate_operation_id(); + + let broadcast_ids = self.broadcast_ids.read(); + + while broadcast_ids.contains_key(&id) { + id = generate_operation_id(); + } + + id + } +} + +/// Currently we treat all RPC transactions as externals. +/// +/// Possibly in the future we could allow opt-in for special treatment +/// of such transactions, so that the block authors can inject +/// some unique transactions via RPC and have them included in the pool. +const TX_SOURCE: TransactionSource = TransactionSource::External; + +#[async_trait] +impl TransactionBroadcastApiServer> + for TransactionBroadcast +where + Pool: TransactionPool + Sync + Send + 'static, + Pool::Hash: Unpin, + ::Hash: Unpin, + Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, +{ + fn broadcast(&self, bytes: Bytes) -> RpcResult> { + let pool = self.pool.clone(); + + // The unique ID of this operation. + let id = self.generate_unique_id(); + + let mut best_block_import_stream = + Box::pin(self.client.import_notification_stream().filter_map( + |notification| async move { notification.is_new_best.then_some(notification.hash) }, + )); + + let broadcast_transaction_fut = async move { + // There is nothing we could do with an extrinsic of invalid format. + let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { + return + }; + + // Flag to determine if the we should broadcast the transaction again. + let mut is_done = false; + + while !is_done { + // Wait for the next block to become available. + let Some(mut best_block_hash) = best_block_import_stream.next().await else { + return + }; + // We are effectively polling the stream for the last available item at this time. + // The `now_or_never` returns `None` if the stream is `Pending`. + // + // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. + while let Some(next) = best_block_import_stream.next().now_or_never() { + let Some(next) = next else { + // Nothing to do if the best block stream terminated. + return + }; + best_block_hash = next; + } + + let submit = + pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); + + // The transaction was not included to the pool, because it is invalid. + // However an invalid transaction can become valid at a later time. + let Ok(mut stream) = submit.await else { return }; + + while let Some(event) = stream.next().await { + match event { + // The transaction propagation stops when: + // - The transaction was included in a finalized block via + // `TransactionStatus::Finalized`. + TransactionStatus::Finalized(_) | + // - The block in which the transaction was included could not be finalized for + // more than 256 blocks via `TransactionStatus::FinalityTimeout`. This could + // happen when: + // - the finality gadget is lagging behing + // - the finality gadget is not available for the chain + TransactionStatus::FinalityTimeout(_) | + // - The transaction has been replaced by another transaction with identical tags + // (same sender and same account nonce). + TransactionStatus::Usurped(_) => { + is_done = true; + break; + }, + + // Dropped transaction may renter the pool at a later time, when other + // transactions have been finalized and remove from the pool. + TransactionStatus::Dropped | + // An invalid transaction may become valid at a later time. + TransactionStatus::Invalid => { + break; + }, + + // The transaction is still in the pool, the ready or future queue. + TransactionStatus::Ready | TransactionStatus::Future | + // Transaction has been broadcasted as intended. + TransactionStatus::Broadcast(_) | + // Transaction has been included in a block, but the block is not finalized yet. + TransactionStatus::InBlock(_) | + // Transaction has been retracted, but it may be included in a block at a later time. + TransactionStatus::Retracted(_) => (), + } + } + } + }; + + // Convert the future into an abortable future, for easily terminating it from the + // `transaction_stop` method. + let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); + // The future expected by the executor must be `Future` instead of + // `Future>`. + let fut = fut.map(drop); + + // Keep track of this entry and the abortable handle. + { + let mut broadcast_ids = self.broadcast_ids.write(); + broadcast_ids.insert(id.clone(), BroadcastState { handle }); + } + + sc_rpc::utils::spawn_subscription_task(&self.executor, fut); + + Ok(Some(id)) + } + + fn stop_broadcast(&self, operation_id: String) -> RpcResult<()> { + let mut broadcast_ids = self.broadcast_ids.write(); + + // TODO: Signal error on wrong operation ID. + let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { return Ok(()) }; + broadcast_state.handle.abort(); + + Ok(()) + } +} From bacbf8f966ae558a466224877479cda96b4b9419 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 18:10:54 +0200 Subject: [PATCH 13/34] tx/tests: Rename tx rpc server to reflect refactoring changes Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests.rs | 23 ++++++++----------- .../src/transaction/transaction_broadcast.rs | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index c1b55b7b70a1..6b55628c6549 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -1,25 +1,18 @@ use super::*; use crate::{ - chain_head::test_utils::ChainHeadMockClient, transaction::Transaction as RpcTransaction, + chain_head::test_utils::ChainHeadMockClient, hex_string, + transaction::TransactionBroadcast as RpcTransactionBroadcast, }; use codec::Encode; use futures::Future; use jsonrpsee::rpc_params; use sc_transaction_pool::*; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; -use sp_core::{ - hexdisplay::{AsBytesRef, HexDisplay}, - testing::TaskExecutor, -}; +use sp_core::testing::TaskExecutor; use std::{pin::Pin, sync::Arc, time::Duration}; use substrate_test_runtime_client::{prelude::*, AccountKeyring::*}; use substrate_test_runtime_transaction_pool::{uxt, TestApi}; -/// Util function to encode a value as a hex string -pub fn hex_string(data: &Data) -> String { - format!("0x{:?}", HexDisplay::from(data)) -} - type Block = substrate_test_runtime_client::runtime::Block; fn create_basic_pool_with_genesis( @@ -59,13 +52,15 @@ async fn tx_broadcast_enters_pool() { let xt = hex_string(&uxt.encode()); let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); - let tx_api = - RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(TaskExecutor::default())) - .into_rpc(); + let tx_api = RpcTransactionBroadcast::new( + client_mock.clone(), + pool.clone(), + Arc::new(TaskExecutor::default()), + ) + .into_rpc(); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); - let _block_1 = api.expect_hash_from_number(1); let _operation_id: String = tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index de2cdb2ac4df..34d424e280c1 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! API implementation for submitting transactions. +//! API implementation for broadcasting transactions. use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor}; use codec::Decode; From da4f247dee905977e45e6d83026d56bd0889332d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 18:36:14 +0200 Subject: [PATCH 14/34] tx/tests: Improve future testability by extracting a setup helper Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests.rs | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index 6b55628c6549..ec20b04f2048 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -5,16 +5,19 @@ use crate::{ }; use codec::Encode; use futures::Future; -use jsonrpsee::rpc_params; +use jsonrpsee::{rpc_params, RpcModule}; use sc_transaction_pool::*; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; use sp_core::testing::TaskExecutor; use std::{pin::Pin, sync::Arc, time::Duration}; -use substrate_test_runtime_client::{prelude::*, AccountKeyring::*}; +use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client}; use substrate_test_runtime_transaction_pool::{uxt, TestApi}; type Block = substrate_test_runtime_client::runtime::Block; +/// Initial Alice account nonce. +const ALICE_NONCE: u64 = 209; + fn create_basic_pool_with_genesis( test_api: Arc, ) -> (BasicPool, Pin + Send>>) { @@ -31,7 +34,7 @@ fn create_basic_pool_with_genesis( } fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { - let api = Arc::new(TestApi::with_alice_nonce(209)); + let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE)); let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); let thread_pool = futures::executor::ThreadPool::new().unwrap(); @@ -39,19 +42,21 @@ fn maintained_pool() -> (BasicPool, Arc, futures::execu (pool, api, thread_pool) } -#[tokio::test] -async fn tx_broadcast_enters_pool() { - let builder = TestClientBuilder::new(); - let _backend = builder.backend(); - let client = Arc::new(builder.build()); - +fn setup_api() -> ( + Arc, + Arc>, + Arc>>, + RpcModule< + TransactionBroadcast, ChainHeadMockClient>>, + >, +) { let (pool, api, _) = maintained_pool(); let pool = Arc::new(pool); - let uxt = uxt(Alice, 209); - let xt = hex_string(&uxt.encode()); - + let builder = TestClientBuilder::new(); + let client = Arc::new(builder.build()); let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); + let tx_api = RpcTransactionBroadcast::new( client_mock.clone(), pool.clone(), @@ -59,9 +64,19 @@ async fn tx_broadcast_enters_pool() { ) .into_rpc(); + (api, pool, client_mock, tx_api) +} + +#[tokio::test] +async fn tx_broadcast_enters_pool() { + let (api, pool, client_mock, tx_api) = setup_api(); + // Start at block 1. let block_1_header = api.push_block(1, vec![], true); + let uxt = uxt(Alice, ALICE_NONCE); + let xt = hex_string(&uxt.encode()); + let _operation_id: String = tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); From 442a20fa96d50e1c9ce6d96533081e5c19d0fc8f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 18:55:27 +0200 Subject: [PATCH 15/34] rpc-v2/tx: Add error for broadcast_stop Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/api.rs | 4 +-- .../rpc-spec-v2/src/transaction/error.rs | 27 +++++++++++++++++++ .../src/transaction/transaction_broadcast.rs | 9 +++++-- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index 9a583a27060d..d2b8ad624156 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -18,7 +18,7 @@ //! API trait for transactions. -use crate::transaction::event::TransactionEvent; +use crate::transaction::{error::ErrorBroadcast, event::TransactionEvent}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use sp_core::Bytes; @@ -56,5 +56,5 @@ pub trait TransactionBroadcastApi { /// /// This method is unstable and subject to change in the future. #[method(name = "transaction_unstable_stop", blocking)] - fn stop_broadcast(&self, operation_id: String) -> RpcResult<()>; + fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>; } diff --git a/substrate/client/rpc-spec-v2/src/transaction/error.rs b/substrate/client/rpc-spec-v2/src/transaction/error.rs index d2de07afd595..116977af6600 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/error.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/error.rs @@ -21,6 +21,7 @@ //! Errors are interpreted as transaction events for subscriptions. use crate::transaction::event::{TransactionError, TransactionEvent}; +use jsonrpsee::types::error::ErrorObject; use sc_transaction_pool_api::error::Error as PoolError; use sp_runtime::transaction_validity::InvalidTransaction; @@ -98,3 +99,29 @@ impl From for TransactionEvent { } } } + +/// TransactionBroadcast error. +#[derive(Debug, thiserror::Error)] +pub enum ErrorBroadcast { + /// The provided operation ID is invalid. + #[error("Invalid operation id")] + InvalidOperationID, +} + +/// General purpose errors, as defined in +/// . +pub mod json_rpc_spec { + /// Invalid parameter error. + pub const INVALID_PARAM_ERROR: i32 = -32602; +} + +impl From for ErrorObject<'static> { + fn from(e: ErrorBroadcast) -> Self { + let msg = e.to_string(); + + match e { + ErrorBroadcast::InvalidOperationID => + ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>), + } + } +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 34d424e280c1..85e9a253886b 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -34,6 +34,8 @@ use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; use std::{collections::HashMap, sync::Arc}; +use super::error::ErrorBroadcast; + /// An API for transaction RPC calls. pub struct TransactionBroadcast { /// Substrate client. @@ -202,11 +204,14 @@ where Ok(Some(id)) } - fn stop_broadcast(&self, operation_id: String) -> RpcResult<()> { + fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> { let mut broadcast_ids = self.broadcast_ids.write(); // TODO: Signal error on wrong operation ID. - let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { return Ok(()) }; + let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { + return Err(ErrorBroadcast::InvalidOperationID) + }; + broadcast_state.handle.abort(); Ok(()) From 10cced361c936ae01faac2b799b89de7307089b2 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jan 2024 18:55:40 +0200 Subject: [PATCH 16/34] tx/tests: Check broadcast stop Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests.rs | 45 ++++++++++++++++++- .../src/transaction/transaction_broadcast.rs | 1 - 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index ec20b04f2048..d1934d21fd61 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -3,9 +3,10 @@ use crate::{ chain_head::test_utils::ChainHeadMockClient, hex_string, transaction::TransactionBroadcast as RpcTransactionBroadcast, }; +use assert_matches::assert_matches; use codec::Encode; use futures::Future; -use jsonrpsee::{rpc_params, RpcModule}; +use jsonrpsee::{core::error::Error, rpc_params, RpcModule}; use sc_transaction_pool::*; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; use sp_core::testing::TaskExecutor; @@ -77,7 +78,7 @@ async fn tx_broadcast_enters_pool() { let uxt = uxt(Alice, ALICE_NONCE); let xt = hex_string(&uxt.encode()); - let _operation_id: String = + let operation_id: String = tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); // Announce block 1 to `transaction_unstable_broadcast`. @@ -104,4 +105,44 @@ async fn tx_broadcast_enters_pool() { pool.maintain(event).await; assert_eq!(0, pool.status().ready); + + // Stop call can still be made. + let _: () = tx_api + .call("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap(); +} + +#[tokio::test] +async fn tx_broadcast_invalid_tx() { + let (_, _, _, tx_api) = setup_api(); + + // Invalid parameters. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params" + ); + + // Invalid transaction that cannot be decoded. The broadcast silently exits. + let xt = "0xdeadbeef"; + let operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + // Make an invalid stop call. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); + + // Ensure stop can be called, the tx was decoded and the broadcast future terminated. + let _: () = tx_api + .call("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap(); } diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 85e9a253886b..92ff63e8a48c 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -207,7 +207,6 @@ where fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> { let mut broadcast_ids = self.broadcast_ids.write(); - // TODO: Signal error on wrong operation ID. let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { return Err(ErrorBroadcast::InvalidOperationID) }; From 909e72f9041cebda023439bc73788bce0ea340f5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jan 2024 11:34:23 +0200 Subject: [PATCH 17/34] rpc-v2/tx: Comment typo Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/transaction_broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 92ff63e8a48c..99e277aca355 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -165,7 +165,7 @@ where break; }, - // Dropped transaction may renter the pool at a later time, when other + // Dropped transaction may enter the pool at a later time, when other // transactions have been finalized and remove from the pool. TransactionStatus::Dropped | // An invalid transaction may become valid at a later time. From ea93a25090fbc18cad2b2d30d6b2ee58e2adff05 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jan 2024 13:45:34 +0200 Subject: [PATCH 18/34] prdoc: Add pr doc Signed-off-by: Alexandru Vasile --- prdoc/pr_3079.prdoc | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 prdoc/pr_3079.prdoc diff --git a/prdoc/pr_3079.prdoc b/prdoc/pr_3079.prdoc new file mode 100644 index 000000000000..c745c1ffbfe5 --- /dev/null +++ b/prdoc/pr_3079.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Implement transaction_unstable_broadcast and transaction_unstable_stop + +doc: + - audience: Node Dev + description: | + A new RPC class is added to handle transactions. The `transaction_unstable_broadcast` broadcasts + the provided transaction to the peers of the node, until the `transaction_unstable_stop` is called. + The APIs are marked as unstable and subject to change in the future. + To know if the transaction was added to the chain, users can decode the bodies of announced finalized blocks. + This is a low-level approach for `transactionWatch_unstable_submitAndWatch`. + +crates: [ ] From 9fdc472c94331d23bc7c459ac8ea23deb36bf28a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jan 2024 13:56:36 +0200 Subject: [PATCH 19/34] tx/tests: Add license Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index d1934d21fd61..3d8670b5e894 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -1,3 +1,21 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + use super::*; use crate::{ chain_head::test_utils::ChainHeadMockClient, hex_string, From f8ea133fd9a26437b92200a611146cec3e2fc06a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jan 2024 14:52:58 +0200 Subject: [PATCH 20/34] rpc-v2/tx: Fix docs Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/transaction_broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 99e277aca355..b82fad217ddd 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -55,7 +55,7 @@ struct BroadcastState { } impl TransactionBroadcast { - /// Creates a new [`Transaction`]. + /// Creates a new [`TransactionBroadcast`]. pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() } } From 21932cbbac463bc282c00893fce02f48da7ae416 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 31 Jan 2024 12:03:57 +0200 Subject: [PATCH 21/34] rpc-v2/tx: Replace mut rand with rand::thread_rng Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/transaction_broadcast.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index b82fad217ddd..2c5eefcf52d2 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -66,8 +66,7 @@ impl TransactionBroadcast { // The lenght of the operation ID. const OPERATION_ID_LEN: usize = 16; - let mut rng = rand::thread_rng(); - (&mut rng) + rand::thread_rng() .sample_iter(Alphanumeric) .take(OPERATION_ID_LEN) .map(char::from) From 015c87b6d00b19250a1b4dd5c772a1c5b57f8d1d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 2 Feb 2024 15:11:50 +0200 Subject: [PATCH 22/34] rpc-v2/tx: Wrapper for the last available element of a stream Signed-off-by: Alexandru Vasile --- .../src/transaction/transaction_broadcast.rs | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 2c5eefcf52d2..82561f59ad74 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -20,7 +20,7 @@ use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor}; use codec::Decode; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; use futures_util::stream::AbortHandle; use jsonrpsee::core::{async_trait, RpcResult}; use parking_lot::RwLock; @@ -122,21 +122,12 @@ where let mut is_done = false; while !is_done { - // Wait for the next block to become available. - let Some(mut best_block_hash) = best_block_import_stream.next().await else { + // Wait for the last block to become available. + let Some(best_block_hash) = + last_stream_element(&mut best_block_import_stream).await + else { return }; - // We are effectively polling the stream for the last available item at this time. - // The `now_or_never` returns `None` if the stream is `Pending`. - // - // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. - while let Some(next) = best_block_import_stream.next().now_or_never() { - let Some(next) = next else { - // Nothing to do if the best block stream terminated. - return - }; - best_block_hash = next; - } let submit = pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); @@ -215,3 +206,25 @@ where Ok(()) } } + +/// Returns the last element of the providided stream, or `None` if the stream is closed. +async fn last_stream_element(stream: &mut S) -> Option +where + S: Stream + Unpin, +{ + let Some(mut element) = stream.next().await else { return None }; + + // We are effectively polling the stream for the last available item at this time. + // The `now_or_never` returns `None` if the stream is `Pending`. + // + // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. + while let Some(next) = stream.next().now_or_never() { + let Some(next) = next else { + // Nothing to do if the stream terminated. + return None + }; + element = next; + } + + Some(element) +} From 301f3bf7ffbc78b34136456cf50e9911579dbd72 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 2 Feb 2024 15:22:18 +0200 Subject: [PATCH 23/34] tx/tests: Check `last_stream_element` returns the last element Signed-off-by: Alexandru Vasile --- .../src/transaction/transaction_broadcast.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 82561f59ad74..03d7cdf26a45 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -228,3 +228,31 @@ where Some(element) } + +#[cfg(test)] +mod tests { + use super::*; + use tokio_stream::wrappers::ReceiverStream; + + #[tokio::test] + async fn check_last_stream_element() { + let (tx, rx) = tokio::sync::mpsc::channel(16); + + let mut stream = ReceiverStream::new(rx); + // Check the stream with one element queued. + tx.send(1).await.unwrap(); + assert_eq!(last_stream_element(&mut stream).await, Some(1)); + + // Check the stream with multiple elements. + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); + assert_eq!(last_stream_element(&mut stream).await, Some(3)); + + // Drop the stream with some elements + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + drop(tx); + assert_eq!(last_stream_element(&mut stream).await, None); + } +} From f174bf64544e2a154e493f33d50bd286f925e75e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 2 Feb 2024 16:11:02 +0200 Subject: [PATCH 24/34] rpc-v2/tx: Continue broadcast on recoverable errors Signed-off-by: Alexandru Vasile --- .../src/transaction/transaction_broadcast.rs | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 03d7cdf26a45..3eb1944e73da 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -27,6 +27,7 @@ use parking_lot::RwLock; use rand::{distributions::Alphanumeric, Rng}; use sc_client_api::BlockchainEvents; use sc_transaction_pool_api::{ + error::{Error as PoolError, IntoPoolError}, BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, }; use sp_blockchain::HeaderBackend; @@ -97,6 +98,7 @@ impl TransactionBroadcastApiServer> for TransactionBroadcast where Pool: TransactionPool + Sync + Send + 'static, + Pool::Error: IntoPoolError, Pool::Hash: Unpin, ::Hash: Unpin, Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, @@ -115,7 +117,7 @@ where let broadcast_transaction_fut = async move { // There is nothing we could do with an extrinsic of invalid format. let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { - return + return; }; // Flag to determine if the we should broadcast the transaction again. @@ -126,15 +128,27 @@ where let Some(best_block_hash) = last_stream_element(&mut best_block_import_stream).await else { - return + return; }; - let submit = - pool.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()); - - // The transaction was not included to the pool, because it is invalid. - // However an invalid transaction can become valid at a later time. - let Ok(mut stream) = submit.await else { return }; + let mut stream = match pool + .submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()) + .await + { + Ok(stream) => stream, + // The transaction was not included to the pool. + Err(e) => { + let Ok(pool_err) = e.into_pool_error() else { return }; + + if is_pool_error_recoverable(&pool_err) { + // Try to resubmit the transaction at a later block for recoverable + // errors. + continue; + } else { + return; + } + }, + }; while let Some(event) = stream.next().await { match event { @@ -229,6 +243,26 @@ where Some(element) } +/// Returns true if the pool error could be recoverable by resubmitting the transaction +/// at a later time. +fn is_pool_error_recoverable(err: &PoolError) -> bool { + match err { + // An invalid transaction is temporarily banned, however it can + // become valid at a later time. + PoolError::TemporarilyBanned | + // The pool is full at the moment. + PoolError::ImmediatelyDropped | + // The block id is not known to the pool. + // The node might be lagging behind, or during a warp sync. + PoolError::InvalidBlockId(_) | + // The pool is configured to not accept future transactions. + PoolError::RejectedFutureTransaction => { + true + } + _ => false + } +} + #[cfg(test)] mod tests { use super::*; From d28e197034081d718e9db5f6ca2e7e74d97209db Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 2 Feb 2024 16:23:01 +0200 Subject: [PATCH 25/34] rpc-v2/tx: Make broadcast and stop methods non-blocking Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/transaction/api.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index f1ac39693212..d3ffecc19d3b 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -47,7 +47,7 @@ pub trait TransactionBroadcastApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "transaction_unstable_broadcast", blocking)] + #[method(name = "transaction_unstable_broadcast")] fn broadcast(&self, bytes: Bytes) -> RpcResult>; /// Broadcast an extrinsic to the chain. @@ -55,6 +55,6 @@ pub trait TransactionBroadcastApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "transaction_unstable_stop", blocking)] + #[method(name = "transaction_unstable_stop")] fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>; } From 03b97b43948b90929c161759cffa3ec66ba6c9bb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 2 Feb 2024 16:26:04 +0200 Subject: [PATCH 26/34] tx/tests: Ensure invalid tx does not enter the pool Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/transaction/tests.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index 3d8670b5e894..e16c5d9b384e 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -133,7 +133,7 @@ async fn tx_broadcast_enters_pool() { #[tokio::test] async fn tx_broadcast_invalid_tx() { - let (_, _, _, tx_api) = setup_api(); + let (_, pool, _, tx_api) = setup_api(); // Invalid parameters. let err = tx_api @@ -144,6 +144,8 @@ async fn tx_broadcast_invalid_tx() { Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params" ); + assert_eq!(0, pool.status().ready); + // Invalid transaction that cannot be decoded. The broadcast silently exits. let xt = "0xdeadbeef"; let operation_id: String = @@ -158,6 +160,8 @@ async fn tx_broadcast_invalid_tx() { Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" ); + assert_eq!(0, pool.status().ready); + // Ensure stop can be called, the tx was decoded and the broadcast future terminated. let _: () = tx_api .call("transaction_unstable_stop", rpc_params![&operation_id]) From 58331aeb5b0d1ba95e5a8fe4b06cb34a8721e099 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 2 Feb 2024 16:38:09 +0200 Subject: [PATCH 27/34] tx/tests: Move invalid tx stop to a dedicated test Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index e16c5d9b384e..2718926f5683 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -151,6 +151,19 @@ async fn tx_broadcast_invalid_tx() { let operation_id: String = tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + assert_eq!(0, pool.status().ready); + + // Ensure stop can be called, the tx was decoded and the broadcast future terminated. + let _: () = tx_api + .call("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap(); +} + +#[tokio::test] +async fn tx_invalid_stop() { + let (_, _, _, tx_api) = setup_api(); + // Make an invalid stop call. let err = tx_api .call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"]) @@ -159,12 +172,4 @@ async fn tx_broadcast_invalid_tx() { assert_matches!(err, Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" ); - - assert_eq!(0, pool.status().ready); - - // Ensure stop can be called, the tx was decoded and the broadcast future terminated. - let _: () = tx_api - .call("transaction_unstable_stop", rpc_params![&operation_id]) - .await - .unwrap(); } From a27e3874b005c31a491a35db42e488b373a60207 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 5 Feb 2024 16:26:26 +0200 Subject: [PATCH 28/34] rpc-v2/tx: Adjust comment wrt TransactionStatus::FinalityTimeout Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/transaction_broadcast.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 3eb1944e73da..ce18e6a1fcd8 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -156,12 +156,6 @@ where // - The transaction was included in a finalized block via // `TransactionStatus::Finalized`. TransactionStatus::Finalized(_) | - // - The block in which the transaction was included could not be finalized for - // more than 256 blocks via `TransactionStatus::FinalityTimeout`. This could - // happen when: - // - the finality gadget is lagging behing - // - the finality gadget is not available for the chain - TransactionStatus::FinalityTimeout(_) | // - The transaction has been replaced by another transaction with identical tags // (same sender and same account nonce). TransactionStatus::Usurped(_) => { @@ -169,6 +163,8 @@ where break; }, + // The maximum number of finality watchers has been reached. + TransactionStatus::FinalityTimeout(_) | // Dropped transaction may enter the pool at a later time, when other // transactions have been finalized and remove from the pool. TransactionStatus::Dropped | From a0deac00dddb4cb77fabcca07dde2cc74f69c212 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 8 Feb 2024 18:05:59 +0200 Subject: [PATCH 29/34] rpc-v2/tx: Remove generic hash from the transaction broadcast API Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/transaction/api.rs | 2 +- .../rpc-spec-v2/src/transaction/transaction_broadcast.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index d3ffecc19d3b..33af9c953338 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -41,7 +41,7 @@ pub trait TransactionApi { } #[rpc(client, server)] -pub trait TransactionBroadcastApi { +pub trait TransactionBroadcastApi { /// Broadcast an extrinsic to the chain. /// /// # Unstable diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index ce18e6a1fcd8..dd801d596d07 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -28,7 +28,7 @@ use rand::{distributions::Alphanumeric, Rng}; use sc_client_api::BlockchainEvents; use sc_transaction_pool_api::{ error::{Error as PoolError, IntoPoolError}, - BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, + TransactionFor, TransactionPool, TransactionSource, TransactionStatus, }; use sp_blockchain::HeaderBackend; use sp_core::Bytes; @@ -94,8 +94,7 @@ impl TransactionBroadcast { const TX_SOURCE: TransactionSource = TransactionSource::External; #[async_trait] -impl TransactionBroadcastApiServer> - for TransactionBroadcast +impl TransactionBroadcastApiServer for TransactionBroadcast where Pool: TransactionPool + Sync + Send + 'static, Pool::Error: IntoPoolError, From 22c1a6b9a772af7bffede2340dc6588645bbc329 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 8 Feb 2024 18:10:03 +0200 Subject: [PATCH 30/34] rpc-v2/tx: Clean internal state on dropping the future broadcast Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/transaction_broadcast.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index dd801d596d07..c1a9e5baaf1e 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -188,9 +188,14 @@ where // Convert the future into an abortable future, for easily terminating it from the // `transaction_stop` method. let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); + let broadcast_ids = self.broadcast_ids.clone(); + let drop_id = id.clone(); // The future expected by the executor must be `Future` instead of // `Future>`. - let fut = fut.map(drop); + let fut = fut.map(move |_| { + // Remove the entry from the broadcast IDs map. + broadcast_ids.write().remove(&drop_id); + }); // Keep track of this entry and the abortable handle. { From 9fb01afdbc51fe7b30d9607ced10213cd075d1b3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 8 Feb 2024 18:11:35 +0200 Subject: [PATCH 31/34] rpc-v2/tx: Remove unpin requirements Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/transaction/transaction_broadcast.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index c1a9e5baaf1e..cae304389573 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -98,7 +98,6 @@ impl TransactionBroadcastApiServer for TransactionBroadcast::Hash: Unpin, Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, { From 68ab17dc34416668fd77a510d1d44d54484ba574 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 8 Feb 2024 18:34:44 +0200 Subject: [PATCH 32/34] tx/tests: Implement future executor for knowing when the tx finishes Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests.rs | 93 ++++++++++++++++--- 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs index 2718926f5683..45477494768a 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -27,13 +27,66 @@ use futures::Future; use jsonrpsee::{core::error::Error, rpc_params, RpcModule}; use sc_transaction_pool::*; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; -use sp_core::testing::TaskExecutor; +use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; use std::{pin::Pin, sync::Arc, time::Duration}; use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client}; use substrate_test_runtime_transaction_pool::{uxt, TestApi}; +use tokio::sync::mpsc; type Block = substrate_test_runtime_client::runtime::Block; +/// Wrap the `TaskExecutor` to know when the broadcast future is dropped. +#[derive(Clone)] +struct TaskExecutorBroadcast { + executor: TaskExecutor, + sender: mpsc::UnboundedSender<()>, +} + +/// The channel that receives events when the broadcast futures are dropped. +type TaskExecutorRecv = mpsc::UnboundedReceiver<()>; + +impl TaskExecutorBroadcast { + /// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures + /// are dropped. + fn new() -> (Self, TaskExecutorRecv) { + let (sender, recv) = mpsc::unbounded_channel(); + + (Self { executor: TaskExecutor::new(), sender }, recv) + } +} + +impl SpawnNamed for TaskExecutorBroadcast { + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn(name, group, future) + } + + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn_blocking(name, group, future) + } +} + /// Initial Alice account nonce. const ALICE_NONCE: u64 = 209; @@ -68,6 +121,7 @@ fn setup_api() -> ( RpcModule< TransactionBroadcast, ChainHeadMockClient>>, >, + TaskExecutorRecv, ) { let (pool, api, _) = maintained_pool(); let pool = Arc::new(pool); @@ -76,19 +130,18 @@ fn setup_api() -> ( let client = Arc::new(builder.build()); let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); - let tx_api = RpcTransactionBroadcast::new( - client_mock.clone(), - pool.clone(), - Arc::new(TaskExecutor::default()), - ) - .into_rpc(); + let (task_executor, executor_recv) = TaskExecutorBroadcast::new(); + + let tx_api = + RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)) + .into_rpc(); - (api, pool, client_mock, tx_api) + (api, pool, client_mock, tx_api, executor_recv) } #[tokio::test] async fn tx_broadcast_enters_pool() { - let (api, pool, client_mock, tx_api) = setup_api(); + let (api, pool, client_mock, tx_api, _) = setup_api(); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); @@ -133,7 +186,7 @@ async fn tx_broadcast_enters_pool() { #[tokio::test] async fn tx_broadcast_invalid_tx() { - let (_, pool, _, tx_api) = setup_api(); + let (_, pool, _, tx_api, mut exec_recv) = setup_api(); // Invalid parameters. let err = tx_api @@ -153,16 +206,26 @@ async fn tx_broadcast_invalid_tx() { assert_eq!(0, pool.status().ready); - // Ensure stop can be called, the tx was decoded and the broadcast future terminated. - let _: () = tx_api - .call("transaction_unstable_stop", rpc_params![&operation_id]) + // Await the broadcast future to exit. + // Without this we'd be subject to races, where we try to call the stop before the tx is + // dropped. + exec_recv.recv().await.unwrap(); + + // The broadcast future was dropped, and the operation is no longer active. + // When the operation is not active, either from the tx being finalized or a + // terminal error; the stop method should return an error. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id]) .await - .unwrap(); + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); } #[tokio::test] async fn tx_invalid_stop() { - let (_, _, _, tx_api) = setup_api(); + let (_, _, _, tx_api, _) = setup_api(); // Make an invalid stop call. let err = tx_api From ea83c6ab009005b81b463ac866e5c7d561559c63 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 8 Feb 2024 18:43:09 +0200 Subject: [PATCH 33/34] Update substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs Co-authored-by: Sebastian Kunert --- .../client/rpc-spec-v2/src/transaction/transaction_broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index cae304389573..617980ba63af 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -64,7 +64,7 @@ impl TransactionBroadcast { /// Generate an unique operation ID for the `transaction_broadcast` RPC method. pub fn generate_unique_id(&self) -> String { let generate_operation_id = || { - // The lenght of the operation ID. + // The length of the operation ID. const OPERATION_ID_LEN: usize = 16; rand::thread_rng() From 55d9eefad93eec9bde5b5f09d0145f9ea116282c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 12 Feb 2024 12:53:40 +0200 Subject: [PATCH 34/34] rpc-v2/tx: Use tx error wrappers Signed-off-by: Alexandru Vasile --- .../src/transaction/transaction_broadcast.rs | 72 +++++-------------- 1 file changed, 16 insertions(+), 56 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 617980ba63af..92c838261874 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -27,8 +27,7 @@ use parking_lot::RwLock; use rand::{distributions::Alphanumeric, Rng}; use sc_client_api::BlockchainEvents; use sc_transaction_pool_api::{ - error::{Error as PoolError, IntoPoolError}, - TransactionFor, TransactionPool, TransactionSource, TransactionStatus, + error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource, }; use sp_blockchain::HeaderBackend; use sp_core::Bytes; @@ -138,10 +137,10 @@ where Err(e) => { let Ok(pool_err) = e.into_pool_error() else { return }; - if is_pool_error_recoverable(&pool_err) { - // Try to resubmit the transaction at a later block for recoverable - // errors. - continue; + if pool_err.is_retriable() { + // Try to resubmit the transaction at a later block for + // recoverable errors. + continue } else { return; } @@ -149,36 +148,17 @@ where }; while let Some(event) = stream.next().await { - match event { - // The transaction propagation stops when: - // - The transaction was included in a finalized block via - // `TransactionStatus::Finalized`. - TransactionStatus::Finalized(_) | - // - The transaction has been replaced by another transaction with identical tags - // (same sender and same account nonce). - TransactionStatus::Usurped(_) => { - is_done = true; - break; - }, - - // The maximum number of finality watchers has been reached. - TransactionStatus::FinalityTimeout(_) | - // Dropped transaction may enter the pool at a later time, when other - // transactions have been finalized and remove from the pool. - TransactionStatus::Dropped | - // An invalid transaction may become valid at a later time. - TransactionStatus::Invalid => { - break; - }, - - // The transaction is still in the pool, the ready or future queue. - TransactionStatus::Ready | TransactionStatus::Future | - // Transaction has been broadcasted as intended. - TransactionStatus::Broadcast(_) | - // Transaction has been included in a block, but the block is not finalized yet. - TransactionStatus::InBlock(_) | - // Transaction has been retracted, but it may be included in a block at a later time. - TransactionStatus::Retracted(_) => (), + // Check if the transaction could be submitted again + // at a later time. + if event.is_retriable() { + break; + } + + // Stop if this is the final event of the transaction stream + // and the event is not retriable. + if event.is_final() { + is_done = true; + break; } } } @@ -242,26 +222,6 @@ where Some(element) } -/// Returns true if the pool error could be recoverable by resubmitting the transaction -/// at a later time. -fn is_pool_error_recoverable(err: &PoolError) -> bool { - match err { - // An invalid transaction is temporarily banned, however it can - // become valid at a later time. - PoolError::TemporarilyBanned | - // The pool is full at the moment. - PoolError::ImmediatelyDropped | - // The block id is not known to the pool. - // The node might be lagging behind, or during a warp sync. - PoolError::InvalidBlockId(_) | - // The pool is configured to not accept future transactions. - PoolError::RejectedFutureTransaction => { - true - } - _ => false - } -} - #[cfg(test)] mod tests { use super::*;