Skip to content

Commit

Permalink
Print "stalled" task on shutdown (paritytech#13022)
Browse files Browse the repository at this point in the history
* Print "stalled" task on shutdown

When the node is shutting down, we give the Tokio runtime 60 seconds to shutdown. If after these 60
seconds there are still running tasks, we now print these tasks. This should help debugging nodes
that have stalled tasks.

This pr introduces a `TaskRegistry` that keeps track of all running tasks. Each task registers and
unregisters itself in this `TaskRegistry`.

* Fix rustdoc

* Update client/service/src/lib.rs
  • Loading branch information
bkchr authored Dec 28, 2022
1 parent d2ef4b0 commit 891d6a5
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sp-version = { version = "5.0.0", path = "../../primitives/version" }
[dev-dependencies]
tempfile = "3.1.0"
futures-timer = "3.0.1"
sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" }

[features]
default = ["rocksdb"]
Expand Down
117 changes: 93 additions & 24 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,38 @@ impl<C: SubstrateCli> Runner<C> {
//
// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
// the tokio runtime will wait the full 60 seconds for all tasks to stop.
drop(task_manager);
let task_registry = task_manager.into_task_registry();

// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
self.tokio_runtime.shutdown_timeout(Duration::from_secs(60));
let shutdown_timeout = Duration::from_secs(60);
self.tokio_runtime.shutdown_timeout(shutdown_timeout);

let running_tasks = task_registry.running_tasks();

if !running_tasks.is_empty() {
log::error!("Detected running(potentially stalled) tasks on shutdown:");
running_tasks.iter().for_each(|(task, count)| {
let instances_desc =
if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };

if task.is_default_group() {
log::error!(
"Task \"{}\" was still running {}after waiting {} seconds to finish.",
task.name,
instances_desc,
shutdown_timeout.as_secs(),
);
} else {
log::error!(
"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
task.name,
task.group,
instances_desc,
shutdown_timeout.as_secs(),
);
}
});
}

res.map_err(Into::into)
}
Expand Down Expand Up @@ -388,34 +416,75 @@ mod tests {
assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
}

fn run_test_in_another_process(
test_name: &str,
test_body: impl FnOnce(),
) -> Option<std::process::Output> {
if std::env::var("RUN_FORKED_TEST").is_ok() {
test_body();
None
} else {
let output = std::process::Command::new(std::env::current_exe().unwrap())
.arg(test_name)
.env("RUN_FORKED_TEST", "1")
.output()
.unwrap();

assert!(output.status.success());
Some(output)
}
}

/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
/// seconds, aka doesn't wait until they are finished (which may never happen).
#[test]
fn ensure_run_until_exit_is_not_blocking_indefinitely() {
let runner = create_runner();
let output = run_test_in_another_process(
"ensure_run_until_exit_is_not_blocking_indefinitely",
|| {
sp_tracing::try_init_simple();

let runner = create_runner();

runner
.run_node_until_exit(move |cfg| async move {
let task_manager =
TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
let (sender, receiver) = futures::channel::oneshot::channel();

// We need to use `spawn_blocking` here so that we get a dedicated thread
// for our future. This future is more blocking code that will never end.
task_manager.spawn_handle().spawn_blocking("test", None, async move {
let _ = sender.send(());
loop {
std::thread::sleep(Duration::from_secs(30));
}
});

task_manager.spawn_essential_handle().spawn_blocking(
"test2",
None,
async {
// Let's stop this essential task directly when our other task
// started. It will signal that the task manager should end.
let _ = receiver.await;
},
);

Ok::<_, sc_service::Error>(task_manager)
})
.unwrap_err();
},
);

runner
.run_node_until_exit(move |cfg| async move {
let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
let (sender, receiver) = futures::channel::oneshot::channel();
let Some(output) = output else { return } ;

// We need to use `spawn_blocking` here so that we get a dedicated thread for our
// future. This future is more blocking code that will never end.
task_manager.spawn_handle().spawn_blocking("test", None, async move {
let _ = sender.send(());
loop {
std::thread::sleep(Duration::from_secs(30));
}
});

task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
// Let's stop this essential task directly when our other task started.
// It will signal that the task manager should end.
let _ = receiver.await;
});
let stderr = dbg!(String::from_utf8(output.stderr).unwrap());

Ok::<_, sc_service::Error>(task_manager)
})
.unwrap_err();
assert!(
stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
);
assert!(!stderr
.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
}
}
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};
pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME};

const DEFAULT_PROTOCOL_ID: &str = "sup";

Expand Down
98 changes: 97 additions & 1 deletion client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ use futures::{
future::{pending, select, try_join_all, BoxFuture, Either},
Future, FutureExt, StreamExt,
};
use parking_lot::Mutex;
use prometheus_endpoint::{
exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
Registry, U64,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{panic, pin::Pin, result::Result};
use std::{
collections::{hash_map::Entry, HashMap},
panic,
pin::Pin,
result::Result,
sync::Arc,
};
use tokio::runtime::Handle;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -72,6 +79,7 @@ pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
tokio_handle: Handle,
metrics: Option<Metrics>,
task_registry: TaskRegistry,
}

impl SpawnTaskHandle {
Expand Down Expand Up @@ -113,6 +121,7 @@ impl SpawnTaskHandle {
) {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
let registry = self.task_registry.clone();

let group = match group.into() {
GroupName::Specific(var) => var,
Expand All @@ -129,6 +138,10 @@ impl SpawnTaskHandle {
}

let future = async move {
// Register the task and keep the "token" alive until the task is ended. Then this
// "token" will unregister this task.
let _registry_token = registry.register_task(name, group);

if let Some(metrics) = metrics {
// Add some wrappers around `task`.
let task = {
Expand Down Expand Up @@ -298,6 +311,8 @@ pub struct TaskManager {
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
children: Vec<TaskManager>,
/// The registry of all running tasks.
task_registry: TaskRegistry,
}

impl TaskManager {
Expand All @@ -324,6 +339,7 @@ impl TaskManager {
essential_failed_rx,
keep_alive: Box::new(()),
children: Vec::new(),
task_registry: Default::default(),
})
}

Expand All @@ -333,6 +349,7 @@ impl TaskManager {
on_exit: self.on_exit.clone(),
tokio_handle: self.tokio_handle.clone(),
metrics: self.metrics.clone(),
task_registry: self.task_registry.clone(),
}
}

Expand Down Expand Up @@ -385,6 +402,14 @@ impl TaskManager {
pub fn add_child(&mut self, child: TaskManager) {
self.children.push(child);
}

/// Consume `self` and return the [`TaskRegistry`].
///
/// This [`TaskRegistry`] can be used to check for still running tasks after this task manager
/// was dropped.
pub fn into_task_registry(self) -> TaskRegistry {
self.task_registry
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -434,3 +459,74 @@ impl Metrics {
})
}
}

/// Ensures that a [`Task`] is unregistered when this object is dropped.
struct UnregisterOnDrop {
task: Task,
registry: TaskRegistry,
}

impl Drop for UnregisterOnDrop {
fn drop(&mut self) {
let mut tasks = self.registry.tasks.lock();

if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
*entry.get_mut() -= 1;

if *entry.get() == 0 {
entry.remove();
}
}
}
}

/// Represents a running async task in the [`TaskManager`].
///
/// As a task is identified by a name and a group, it is totally valid that there exists multiple
/// tasks with the same name and group.
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct Task {
/// The name of the task.
pub name: &'static str,
/// The group this task is associated to.
pub group: &'static str,
}

impl Task {
/// Returns if the `group` is the [`DEFAULT_GROUP_NAME`].
pub fn is_default_group(&self) -> bool {
self.group == DEFAULT_GROUP_NAME
}
}

/// Keeps track of all running [`Task`]s in [`TaskManager`].
#[derive(Clone, Default)]
pub struct TaskRegistry {
tasks: Arc<Mutex<HashMap<Task, usize>>>,
}

impl TaskRegistry {
/// Register a task with the given `name` and `group`.
///
/// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is
/// dropped.
fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
let task = Task { name, group };

{
let mut tasks = self.tasks.lock();

*(*tasks).entry(task.clone()).or_default() += 1;
}

UnregisterOnDrop { task, registry: self.clone() }
}

/// Returns the running tasks.
///
/// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The
/// number per task represents the concurrently running tasks with the same identifier.
pub fn running_tasks(&self) -> HashMap<Task, usize> {
(*self.tasks.lock()).clone()
}
}

0 comments on commit 891d6a5

Please sign in to comment.