diff --git a/availability-store/src/lib.rs b/availability-store/src/lib.rs index 281441fb5e87..7abc444758f7 100644 --- a/availability-store/src/lib.rs +++ b/availability-store/src/lib.rs @@ -23,7 +23,7 @@ #![warn(missing_docs)] use futures::prelude::*; -use futures::{channel::{mpsc, oneshot}, task::Spawn}; +use futures::channel::{mpsc, oneshot}; use keystore::KeyStorePtr; use polkadot_primitives::{ Hash, Block, @@ -39,6 +39,7 @@ use client::{ }; use sp_api::{ApiExt, ProvideRuntimeApi}; use codec::{Encode, Decode}; +use sp_core::traits::SpawnNamed; use log::warn; @@ -174,7 +175,7 @@ impl Store { &self, wrapped_block_import: I, client: Arc

, - spawner: impl Spawn, + spawner: impl SpawnNamed, keystore: KeyStorePtr, ) -> ClientResult> where diff --git a/availability-store/src/worker.rs b/availability-store/src/worker.rs index 01872c177c0f..8a3898579f54 100644 --- a/availability-store/src/worker.rs +++ b/availability-store/src/worker.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::thread; use log::{error, info, trace, warn}; -use sp_blockchain::{Result as ClientResult}; +use sp_blockchain::Result as ClientResult; use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256}; use sp_api::{ApiExt, ProvideRuntimeApi}; use client::{ @@ -32,12 +32,13 @@ use consensus_common::{ ImportResult, import_queue::CacheKeyId, }; +use sp_core::traits::SpawnNamed; use polkadot_primitives::{Block, BlockId, Hash}; use polkadot_primitives::parachain::{ ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData, ValidatorPair, ErasureChunk, }; -use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}}; +use futures::{prelude::*, future::select, channel::{mpsc, oneshot}}; use futures::future::AbortHandle; use keystore::KeyStorePtr; @@ -641,7 +642,7 @@ impl AvailabilityBlockImport { pub(crate) fn new( client: Arc

, block_import: I, - spawner: impl Spawn, + spawner: impl SpawnNamed, keystore: KeyStorePtr, to_worker: mpsc::UnboundedSender, ) -> Self @@ -662,9 +663,7 @@ impl AvailabilityBlockImport { to_worker.clone(), )); - if let Err(_) = spawner.spawn(prune_available.map(drop)) { - error!(target: LOG_TARGET, "Failed to spawn availability pruning task"); - } + spawner.spawn("polkadot-prune-availibility", prune_available.map(drop).boxed()); AvailabilityBlockImport { client, diff --git a/collator/src/lib.rs b/collator/src/lib.rs index 7e3e0bd3420c..0215b33ee6a1 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -50,7 +50,7 @@ use std::sync::Arc; use std::time::Duration; use std::pin::Pin; -use futures::{future, Future, Stream, FutureExt, StreamExt, task::Spawn}; +use futures::{future, Future, Stream, FutureExt, StreamExt}; use log::warn; use sc_client_api::{StateBackend, BlockchainEvents}; use sp_blockchain::HeaderBackend; @@ -82,6 +82,7 @@ use polkadot_service_new::{ Error as ServiceError, FullNodeHandles, PolkadotClient, }; use sc_service::SpawnTaskHandle; +use sp_core::traits::SpawnNamed; const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -133,7 +134,7 @@ pub trait BuildParachainContext { Client::Api: RuntimeApiCollection, >::StateBackend: StateBackend>, Extrinsic: codec::Codec + Send + Sync + 'static, - SP: Spawn + Clone + Send + Sync + 'static; + SP: SpawnNamed + Clone + Send + Sync + 'static; } /// Parachain context needed for collation. @@ -233,7 +234,7 @@ fn build_collator_service( P::ParachainContext: Send + 'static, ::ProduceCandidate: Send, Extrinsic: service::Codec + Send + Sync + 'static, - SP: Spawn + Clone + Send + Sync + 'static, + SP: SpawnNamed + Clone + Send + Sync + 'static, { Err("Collator is not functional with the new service yet".into()) } diff --git a/network/src/legacy/gossip/mod.rs b/network/src/legacy/gossip/mod.rs index 7e97eb688b15..7dea99656667 100644 --- a/network/src/legacy/gossip/mod.rs +++ b/network/src/legacy/gossip/mod.rs @@ -295,7 +295,7 @@ pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash { pub fn register_validator( service: Arc>, chain: C, - executor: &impl futures::task::Spawn, + executor: &impl sp_core::traits::SpawnNamed, ) -> RegisteredMessageValidator { let s = service.clone(); @@ -331,12 +331,7 @@ pub fn register_validator( let fut = futures::future::poll_fn(move |cx| { gossip_engine.lock().poll_unpin(cx) }); - let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut))); - - // Note: we consider the chances of an error to spawn a background task almost null. - if spawn_res.is_err() { - log::error!(target: "polkadot-gossip", "Failed to spawn background task"); - } + executor.spawn("polkadot-legacy-gossip-engine", fut.boxed()); } RegisteredMessageValidator { diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 0ed2d9ac4395..c36dbf1945ee 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -26,7 +26,7 @@ use codec::{Decode, Encode}; use futures::channel::{mpsc, oneshot}; use futures::future::Either; use futures::prelude::*; -use futures::task::{Spawn, SpawnExt, Context, Poll}; +use futures::task::{Context, Poll}; use futures::stream::{FuturesUnordered, StreamFuture}; use log::{debug, trace}; @@ -44,6 +44,7 @@ use polkadot_validation::{ use sc_network::{ObservedRole, Event, PeerId}; use sp_api::ProvideRuntimeApi; use sp_runtime::ConsensusEngineId; +use sp_core::traits::SpawnNamed; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::pin::Pin; @@ -126,7 +127,9 @@ enum ServiceToWorkerMsg { /// Messages from a background task to the main worker task. enum BackgroundToWorkerMsg { // Spawn a given future. - Spawn(future::BoxFuture<'static, ()>), + // + // The name is used for the future task. + Spawn(&'static str, future::BoxFuture<'static, ()>), } /// Operations that a handle to an underlying network service should provide. @@ -221,7 +224,7 @@ pub fn start( C: ChainContext + 'static, Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, - SP: Spawn + Clone + Send + 'static, + SP: SpawnNamed + Clone + Send + 'static, { const SERVICE_TO_WORKER_BUF: usize = 256; @@ -234,67 +237,73 @@ pub fn start( chain_context, &executor, ); - executor.spawn(worker_loop( - config, - service.clone(), - gossip_validator, - api, - worker_receiver, - executor.clone(), - ))?; + executor.spawn( + "polkadot-network-worker", + worker_loop( + config, + service.clone(), + gossip_validator, + api, + worker_receiver, + executor.clone(), + ).boxed(), + ); let polkadot_service = Service { sender: worker_sender.clone(), network_service: service.clone(), }; - executor.spawn(async move { - while let Some(event) = event_stream.next().await { - let res = match event { - Event::Dht(_) => continue, - Event::NotificationStreamOpened { - remote, - engine_id, - role, - } => { - if engine_id != POLKADOT_ENGINE_ID { continue } - - worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await - }, - Event::NotificationStreamClosed { - remote, - engine_id, - } => { - if engine_id != POLKADOT_ENGINE_ID { continue } - - worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await - }, - Event::NotificationsReceived { - remote, - messages, - } => { - let our_notifications = messages.into_iter() - .filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID { - Some(message) - } else { - None - }) - .collect(); + executor.spawn( + "polkadot-network-notifications", + async move { + while let Some(event) = event_stream.next().await { + let res = match event { + Event::Dht(_) => continue, + Event::NotificationStreamOpened { + remote, + engine_id, + role, + } => { + if engine_id != POLKADOT_ENGINE_ID { continue } + + worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await + }, + Event::NotificationStreamClosed { + remote, + engine_id, + } => { + if engine_id != POLKADOT_ENGINE_ID { continue } - worker_sender.send( - ServiceToWorkerMsg::PeerMessage(remote, our_notifications) - ).await - } - }; + worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await + }, + Event::NotificationsReceived { + remote, + messages, + } => { + let our_notifications = messages.into_iter() + .filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID { + Some(message) + } else { + None + }) + .collect(); + + worker_sender.send( + ServiceToWorkerMsg::PeerMessage(remote, our_notifications) + ).await + } + }; - if let Err(e) = res { - // full is impossible here, as we've `await`ed the value being sent. - if e.is_disconnected() { - break + if let Err(e) = res { + // full is impossible here, as we've `await`ed the value being sent. + if e.is_disconnected() { + break + } } } - } - })?; + }.boxed(), + ); Ok(polkadot_service) } @@ -845,7 +854,7 @@ struct Worker { impl Worker where Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, - Sp: Spawn + Clone, + Sp: SpawnNamed + Clone, Gossip: GossipOps, { // spawns a background task to spawn consensus networking. @@ -888,14 +897,17 @@ impl Worker where // glue the incoming messages, shared table, and validation // work together. - let _ = self.executor.spawn(statement_import_loop( - relay_parent, - table, - self.api.clone(), - self.gossip_handle.clone(), - self.background_to_main_sender.clone(), - exit, - )); + self.executor.spawn( + "polkadot-statement-import-loop", + statement_import_loop( + relay_parent, + table, + self.api.clone(), + self.gossip_handle.clone(), + self.background_to_main_sender.clone(), + exit, + ).boxed(), + ); } fn handle_service_message(&mut self, message: ServiceToWorkerMsg) { @@ -932,12 +944,15 @@ impl Worker where // before placing in the pool, so we can safely check by candidate hash. let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle); - let _ = self.executor.spawn(async move { - let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await; - if let Either::Left((pov_block, _)) = res { - let _ = sender.send(pov_block); - } - }); + self.executor.spawn( + "polkadot-fetch-pov-block", + async move { + let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await; + if let Either::Left((pov_block, _)) = res { + let _ = sender.send(pov_block); + } + }.boxed(), + ); } ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => { let topic = crate::erasure_coding_topic(&candidate_hash); @@ -963,12 +978,15 @@ impl Worker where "gossip message streams do not conclude early; qed" )); - let _ = self.executor.spawn(async move { - let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await; - if let Either::Left((chunk, _)) = res { - let _ = sender.send(chunk); - } - }); + self.executor.spawn( + "polkadot-fetch-erasure-chunk", + async move { + let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await; + if let Either::Left((chunk, _)) = res { + let _ = sender.send(chunk); + } + }.boxed(), + ); } ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => { let topic = crate::erasure_coding_topic(&candidate_hash); @@ -1017,8 +1035,8 @@ impl Worker where fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) { match message { - BackgroundToWorkerMsg::Spawn(task) => { - let _ = self.executor.spawn(task); + BackgroundToWorkerMsg::Spawn(name, task) => { + let _ = self.executor.spawn(name, task); } } } @@ -1068,7 +1086,7 @@ async fn worker_loop( ) where Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, - Sp: Spawn + Clone, + Sp: SpawnNamed + Clone, { const BACKGROUND_TO_MAIN_BUF: usize = 16; @@ -1250,7 +1268,7 @@ async fn statement_import_loop( let work = future::select(work.boxed(), exit.clone()).map(drop); if let Err(_) = to_worker.send( - BackgroundToWorkerMsg::Spawn(work.boxed()) + BackgroundToWorkerMsg::Spawn("polkadot-statement-import-loop-sub-task", work.boxed()) ).await { // can fail only if remote has hung up - worker is dead, // we should die too. this is defensive, since the exit future diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 0e78479fb260..049af3f5aca7 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -30,11 +30,24 @@ use av_store::{Store as AvailabilityStore, ErasureNetworking}; use sc_network_gossip::TopicNotification; use sp_api::{ApiRef, ProvideRuntimeApi}; use sp_runtime::traits::Block as BlockT; -use sp_core::crypto::Pair; +use sp_core::{crypto::Pair, traits::SpawnNamed}; use sp_keyring::Sr25519Keyring; -use futures::executor::LocalPool; -use futures::task::LocalSpawnExt; +use futures::executor::{LocalPool, LocalSpawner}; +use futures::task::{LocalSpawnExt, SpawnExt}; + +#[derive(Clone)] +struct Executor(LocalSpawner); + +impl SpawnNamed for Executor { + fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.0.spawn_local(future).unwrap(); + } + + fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.spawn(name, future); + } +} #[derive(Default)] pub struct MockNetworkOps { @@ -243,7 +256,7 @@ fn test_setup(config: Config) -> ( mock_gossip.clone(), api.clone(), worker_rx, - pool.spawner(), + Executor(pool.spawner()), ); let service = Service { diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index a67088ff503d..d84b1be078ad 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -26,13 +26,12 @@ //! //! These attestation sessions are kept live until they are periodically garbage-collected. -use std::{time::{Duration, Instant}, sync::Arc, pin::Pin}; -use std::collections::HashMap; +use std::{time::{Duration, Instant}, sync::Arc, pin::Pin, collections::HashMap}; use crate::pipeline::FullOutput; use sc_client_api::{BlockchainEvents, BlockBackend}; use consensus::SelectChain; -use futures::{prelude::*, task::{Spawn, SpawnExt}}; +use futures::prelude::*; use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::parachain::{ Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, @@ -42,17 +41,15 @@ use keystore::KeyStorePtr; use sp_api::{ProvideRuntimeApi, ApiExt}; use runtime_primitives::traits::HashFor; use availability_store::Store as AvailabilityStore; +use primitives::traits::SpawnNamed; use ansi_term::Colour; -use log::{warn, error, info, debug, trace}; +use log::{warn, info, debug, trace}; use super::{Network, Collators, SharedTable, TableRouter}; use crate::Error; use crate::pipeline::ValidationPool; -/// A handle to spawn background tasks onto. -pub type TaskExecutor = Arc; - // Remote processes may request for a validation instance to be cloned or instantiated. // They send a oneshot channel. type ValidationInstanceRequest = ( @@ -148,7 +145,7 @@ impl ServiceBuilder where N::BuildTableRouter: Send + Unpin + 'static, ::SendLocalCollation: Send, SC: SelectChain + 'static, - SP: Spawn + Send + 'static, + SP: SpawnNamed + Send + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sp_api::StateBackendFor: sp_api::StateBackend>, { @@ -337,7 +334,7 @@ impl ParachainValidationInstances where N::TableRouter: Send + 'static + Sync, ::SendLocalCollation: Send, N::BuildTableRouter: Unpin + Send + 'static, - SP: Spawn + Send + 'static, + SP: SpawnNamed + Send + 'static, CF: CollationFetch + Clone + Send + Sync + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sp_api::StateBackendFor: sp_api::StateBackend>, @@ -453,19 +450,16 @@ impl ParachainValidationInstances where let collation_fetch = self.collation_fetch.clone(); let router = router.clone(); - let res = self.spawner.spawn( + self.spawner.spawn( + "polkadot-parachain-validation-work", launch_work( move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators), availability_store, router, n_validators, index, - ), + ).boxed(), ); - - if let Err(e) = res { - error!(target: "validation", "Failed to launch work: {:?}", e); - } } let tracker = ValidationInstanceHandle { @@ -549,7 +543,7 @@ async fn launch_work( #[cfg(test)] mod tests { use super::*; - use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc}; + use futures::{executor, future::ready, channel::mpsc}; use availability_store::ErasureNetworking; use polkadot_primitives::parachain::{ PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex, @@ -559,6 +553,7 @@ mod tests { use runtime_primitives::traits::Block as BlockT; use std::pin::Pin; use sp_keyring::sr25519::Keyring; + use primitives::testing::SpawnBlockingExecutor; /// Events fired while running mock implementations to follow execution. enum Events { @@ -719,7 +714,7 @@ mod tests { #[test] fn launch_work_is_executed_properly() { - let executor = ThreadPool::new().unwrap(); + let executor = SpawnBlockingExecutor::new(); let keystore = keystore::Store::new_in_memory(); // Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator. @@ -759,7 +754,7 @@ mod tests { #[test] fn router_is_built_on_relay_chain_validator() { - let executor = ThreadPool::new().unwrap(); + let executor = SpawnBlockingExecutor::new(); let keystore = keystore::Store::new_in_memory(); // Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator.