diff --git a/Cargo.lock b/Cargo.lock index c93a0b7fdd28a..b2aca09fe2564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7280,6 +7280,7 @@ dependencies = [ "sp-keystore", "sp-panic-handler", "sp-runtime", + "sp-tracing", "sp-version", "tempfile", "thiserror", diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index a5de78da487c8..ee4426b91dbb8 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -50,6 +50,7 @@ sp-version = { version = "5.0.0", path = "../../primitives/version" } [dev-dependencies] tempfile = "3.1.0" futures-timer = "3.0.1" +sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" } [features] default = ["rocksdb"] diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index d4191feddfa90..1a532b3bbc6fb 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -152,10 +152,38 @@ impl Runner { // // This is important to be done before we instruct the tokio runtime to shutdown. Otherwise // the tokio runtime will wait the full 60 seconds for all tasks to stop. - drop(task_manager); + let task_registry = task_manager.into_task_registry(); // Give all futures 60 seconds to shutdown, before tokio "leaks" them. - self.tokio_runtime.shutdown_timeout(Duration::from_secs(60)); + let shutdown_timeout = Duration::from_secs(60); + self.tokio_runtime.shutdown_timeout(shutdown_timeout); + + let running_tasks = task_registry.running_tasks(); + + if !running_tasks.is_empty() { + log::error!("Detected running(potentially stalled) tasks on shutdown:"); + running_tasks.iter().for_each(|(task, count)| { + let instances_desc = + if *count > 1 { format!("with {} instances ", count) } else { "".to_string() }; + + if task.is_default_group() { + log::error!( + "Task \"{}\" was still running {}after waiting {} seconds to finish.", + task.name, + instances_desc, + shutdown_timeout.as_secs(), + ); + } else { + log::error!( + "Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.", + task.name, + task.group, + instances_desc, + shutdown_timeout.as_secs(), + ); + } + }); + } res.map_err(Into::into) } @@ -388,34 +416,75 @@ mod tests { assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50)); } + fn run_test_in_another_process( + test_name: &str, + test_body: impl FnOnce(), + ) -> Option { + if std::env::var("RUN_FORKED_TEST").is_ok() { + test_body(); + None + } else { + let output = std::process::Command::new(std::env::current_exe().unwrap()) + .arg(test_name) + .env("RUN_FORKED_TEST", "1") + .output() + .unwrap(); + + assert!(output.status.success()); + Some(output) + } + } + /// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60 /// seconds, aka doesn't wait until they are finished (which may never happen). #[test] fn ensure_run_until_exit_is_not_blocking_indefinitely() { - let runner = create_runner(); + let output = run_test_in_another_process( + "ensure_run_until_exit_is_not_blocking_indefinitely", + || { + sp_tracing::try_init_simple(); + + let runner = create_runner(); + + runner + .run_node_until_exit(move |cfg| async move { + let task_manager = + TaskManager::new(cfg.tokio_handle.clone(), None).unwrap(); + let (sender, receiver) = futures::channel::oneshot::channel(); + + // We need to use `spawn_blocking` here so that we get a dedicated thread + // for our future. This future is more blocking code that will never end. + task_manager.spawn_handle().spawn_blocking("test", None, async move { + let _ = sender.send(()); + loop { + std::thread::sleep(Duration::from_secs(30)); + } + }); + + task_manager.spawn_essential_handle().spawn_blocking( + "test2", + None, + async { + // Let's stop this essential task directly when our other task + // started. It will signal that the task manager should end. + let _ = receiver.await; + }, + ); + + Ok::<_, sc_service::Error>(task_manager) + }) + .unwrap_err(); + }, + ); - runner - .run_node_until_exit(move |cfg| async move { - let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap(); - let (sender, receiver) = futures::channel::oneshot::channel(); + let Some(output) = output else { return } ; - // We need to use `spawn_blocking` here so that we get a dedicated thread for our - // future. This future is more blocking code that will never end. - task_manager.spawn_handle().spawn_blocking("test", None, async move { - let _ = sender.send(()); - loop { - std::thread::sleep(Duration::from_secs(30)); - } - }); - - task_manager.spawn_essential_handle().spawn_blocking("test2", None, async { - // Let's stop this essential task directly when our other task started. - // It will signal that the task manager should end. - let _ = receiver.await; - }); + let stderr = dbg!(String::from_utf8(output.stderr).unwrap()); - Ok::<_, sc_service::Error>(task_manager) - }) - .unwrap_err(); + assert!( + stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.") + ); + assert!(!stderr + .contains("Task \"test2\" was still running after waiting 60 seconds to finish.")); } } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8b3a29ba4032a..1529b822ade32 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions; pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool}; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; -pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME}; +pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME}; const DEFAULT_PROTOCOL_ID: &str = "sup"; diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 23265f9672555..d792122576444 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -24,12 +24,19 @@ use futures::{ future::{pending, select, try_join_all, BoxFuture, Either}, Future, FutureExt, StreamExt, }; +use parking_lot::Mutex; use prometheus_endpoint::{ exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use std::{panic, pin::Pin, result::Result}; +use std::{ + collections::{hash_map::Entry, HashMap}, + panic, + pin::Pin, + result::Result, + sync::Arc, +}; use tokio::runtime::Handle; use tracing_futures::Instrument; @@ -72,6 +79,7 @@ pub struct SpawnTaskHandle { on_exit: exit_future::Exit, tokio_handle: Handle, metrics: Option, + task_registry: TaskRegistry, } impl SpawnTaskHandle { @@ -113,6 +121,7 @@ impl SpawnTaskHandle { ) { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + let registry = self.task_registry.clone(); let group = match group.into() { GroupName::Specific(var) => var, @@ -129,6 +138,10 @@ impl SpawnTaskHandle { } let future = async move { + // Register the task and keep the "token" alive until the task is ended. Then this + // "token" will unregister this task. + let _registry_token = registry.register_task(name, group); + if let Some(metrics) = metrics { // Add some wrappers around `task`. let task = { @@ -298,6 +311,8 @@ pub struct TaskManager { /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. children: Vec, + /// The registry of all running tasks. + task_registry: TaskRegistry, } impl TaskManager { @@ -324,6 +339,7 @@ impl TaskManager { essential_failed_rx, keep_alive: Box::new(()), children: Vec::new(), + task_registry: Default::default(), }) } @@ -333,6 +349,7 @@ impl TaskManager { on_exit: self.on_exit.clone(), tokio_handle: self.tokio_handle.clone(), metrics: self.metrics.clone(), + task_registry: self.task_registry.clone(), } } @@ -385,6 +402,14 @@ impl TaskManager { pub fn add_child(&mut self, child: TaskManager) { self.children.push(child); } + + /// Consume `self` and return the [`TaskRegistry`]. + /// + /// This [`TaskRegistry`] can be used to check for still running tasks after this task manager + /// was dropped. + pub fn into_task_registry(self) -> TaskRegistry { + self.task_registry + } } #[derive(Clone)] @@ -434,3 +459,74 @@ impl Metrics { }) } } + +/// Ensures that a [`Task`] is unregistered when this object is dropped. +struct UnregisterOnDrop { + task: Task, + registry: TaskRegistry, +} + +impl Drop for UnregisterOnDrop { + fn drop(&mut self) { + let mut tasks = self.registry.tasks.lock(); + + if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) { + *entry.get_mut() -= 1; + + if *entry.get() == 0 { + entry.remove(); + } + } + } +} + +/// Represents a running async task in the [`TaskManager`]. +/// +/// As a task is identified by a name and a group, it is totally valid that there exists multiple +/// tasks with the same name and group. +#[derive(Clone, Hash, Eq, PartialEq)] +pub struct Task { + /// The name of the task. + pub name: &'static str, + /// The group this task is associated to. + pub group: &'static str, +} + +impl Task { + /// Returns if the `group` is the [`DEFAULT_GROUP_NAME`]. + pub fn is_default_group(&self) -> bool { + self.group == DEFAULT_GROUP_NAME + } +} + +/// Keeps track of all running [`Task`]s in [`TaskManager`]. +#[derive(Clone, Default)] +pub struct TaskRegistry { + tasks: Arc>>, +} + +impl TaskRegistry { + /// Register a task with the given `name` and `group`. + /// + /// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is + /// dropped. + fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop { + let task = Task { name, group }; + + { + let mut tasks = self.tasks.lock(); + + *(*tasks).entry(task.clone()).or_default() += 1; + } + + UnregisterOnDrop { task, registry: self.clone() } + } + + /// Returns the running tasks. + /// + /// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The + /// number per task represents the concurrently running tasks with the same identifier. + pub fn running_tasks(&self) -> HashMap { + (*self.tasks.lock()).clone() + } +}