diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 88d5e6b6a99..7633299b302 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,7 +4,7 @@ //! compilation. mod pool; -pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task}; +pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, SpawnError, Spawner, Task}; cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f73868ee9e7..009b3226e28 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, ToHandle}; use std::collections::{HashMap, VecDeque}; use std::fmt; +use std::io; use std::time::Duration; pub(crate) struct BlockingPool { @@ -82,6 +83,26 @@ pub(crate) enum Mandatory { NonMandatory, } +pub(crate) enum SpawnError { + /// Pool is shutting down and the task was not scheduled + ShuttingDown, + /// There are no worker threads available to take the task + /// and the OS failed to spawn a new one + NoThreads(io::Error), +} + +#[allow(clippy::from_over_into)] // Orphan rules +impl Into for SpawnError { + fn into(self) -> io::Error { + match self { + Self::ShuttingDown => { + io::Error::new(io::ErrorKind::Other, "blocking pool shutting down") + } + Self::NoThreads(e) => e, + } + } +} + impl Task { pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { Task { task, mandatory } @@ -220,7 +241,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> { + pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), SpawnError> { let mut shared = self.inner.shared.lock(); if shared.shutdown { @@ -230,7 +251,7 @@ impl Spawner { task.task.shutdown(); // no need to even push this task; it would never get picked up - return Err(()); + return Err(SpawnError::ShuttingDown); } shared.queue.push_back(task); @@ -261,7 +282,7 @@ impl Spawner { Err(e) => { // The OS refused to spawn the thread and there is no thread // to pick up the task that has just been pushed to the queue. - panic!("OS can't spawn worker thread: {}", e) + return Err(SpawnError::NoThreads(e)); } } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 14101070cd3..961668c0a74 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -341,7 +341,7 @@ impl HandleInner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, _was_spawned) = if cfg!(debug_assertions) + let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt) @@ -349,7 +349,14 @@ impl HandleInner { self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt) }; - join_handle + match spawn_result { + Ok(()) => join_handle, + // Compat: do not panic here, return the join_handle even though it will never resolve + Err(blocking::SpawnError::ShuttingDown) => join_handle, + Err(blocking::SpawnError::NoThreads(e)) => { + panic!("OS can't spawn worker thread: {}", e) + } + } } cfg_fs! { @@ -363,7 +370,7 @@ impl HandleInner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { self.spawn_blocking_inner( Box::new(func), blocking::Mandatory::Mandatory, @@ -379,7 +386,7 @@ impl HandleInner { ) }; - if was_spawned { + if spawn_result.is_ok() { Some(join_handle) } else { None @@ -394,7 +401,7 @@ impl HandleInner { is_mandatory: blocking::Mandatory, name: Option<&str>, rt: &dyn ToHandle, - ) -> (JoinHandle, bool) + ) -> (JoinHandle, Result<(), blocking::SpawnError>) where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -424,7 +431,7 @@ impl HandleInner { let spawned = self .blocking_spawner .spawn(blocking::Task::new(task, is_mandatory), rt); - (handle, spawned.is_ok()) + (handle, spawned) } } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 1c779c82386..dd5d4845692 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -99,7 +99,11 @@ impl<'a> Builder<'a> { /// [runtime handle]: crate::runtime::Handle /// [`Handle::spawn`]: crate::runtime::Handle::spawn #[track_caller] - pub fn spawn_on(&mut self, future: Fut, handle: &Handle) -> io::Result> + pub fn spawn_on( + &mut self, + future: Fut, + handle: &Handle, + ) -> io::Result> where Fut: Future + Send + 'static, Fut::Output: Send + 'static, @@ -138,7 +142,11 @@ impl<'a> Builder<'a> { /// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local /// [`LocalSet`]: crate::task::LocalSet #[track_caller] - pub fn spawn_local_on(self, future: Fut, local_set: &LocalSet) -> io::Result> + pub fn spawn_local_on( + self, + future: Fut, + local_set: &LocalSet, + ) -> io::Result> where Fut: Future + 'static, Fut::Output: 'static, @@ -155,7 +163,10 @@ impl<'a> Builder<'a> { /// See [`task::spawn_blocking`](crate::task::spawn_blocking) /// for more details. #[track_caller] - pub fn spawn_blocking(self, function: Function) -> io::Result> + pub fn spawn_blocking( + self, + function: Function, + ) -> io::Result> where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, @@ -180,13 +191,13 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner( + let (join_handle, spawn_result) = handle.as_inner().spawn_blocking_inner( function, Mandatory::NonMandatory, self.name, handle, ); - Ok(join_handle) + spawn_result.map(|()| join_handle).map_err(Into::into) } }