-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add task::scope #2153
WIP: Add task::scope #2153
Conversation
This change adds `task::scope` as a mechanism for supporting structured concurrency as described in tokio-rs#1879. The change adds a `task::scope` function which will forcefully cancel all child tasks when the scope is exited, as well as a `task::scope_with_options` function which allows to override the default cancellation and drop behavior. The `scope` implementations makes use of 2 primitives: - CancellationToken: This allows to signal an arbitrary amount of tasks to cancel - WaitGroup: This allows to wait for outstanding tasks to complete Both primitives are implemented using mechansims and code from futures-intrusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool, and I'd like to take a closer look. I commented on a few questions & thoughts.
mod scope; | ||
pub use scope::{scope, scope_with_options, ScopeOptions, ScopedJoinHandle, ScopeHandle, ScopeCancelBehavior, ScopeDropBehavior}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about, instead of calling these types ScopeOptions
, ScopeHandle
, ScopeCancelBehavior
, etc, we make the scope
module public, and just refer to them as scope::Options
, scope::Handle
, etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm flexible regarding those. However one concern I have with names like Option
and Handle
is that they are already used in some places. And if users need both of them (e.g. runtime Handle
and scope Handle
- which is not too unreasonable - they would either need to rename them or only import the module. But maybe that problem doesn't exist, since we expect them to import only scope
and use the individual structs in a qualified fashion as you mentioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We encourage people to import only the module in tracing-subscriber
and I was shocked as to how well that worked. For instance, we now have a fmt::Layer
and a layer::Layer
and I don’t find them to be confusing in the slightest.
@@ -0,0 +1,255 @@ | |||
//! An asynchronously awaitable event for signalization between tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think "signalization" is a word :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non native speakers it works fine 😂
I first wanted to say "please provide a recommendation for an update" - but it's wrong anyway - the description was copied from futures-intrusive ManualResetEvent
, but this is a CancellationToken
// The Event is can be sent to other threads as long as it's not borrowed | ||
unsafe impl Send for CancellationToken {} | ||
// The Event is thread-safe as long as the utilized Mutex is thread-safe | ||
unsafe impl Sync for CancellationToken {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming this is inherited from futures-intrusive
— AFAICT, we should automatically be Send
+ Sync
without these impls, since we are not generic over a mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik the issue was the raw pointer for the linked waiter list which is stored inside the struct. As long this is there the struct can't be Send
, and the Mutex can't make it sync.
I can check if those are still necessary, but I guess so.
// WaitForCancellationFuture only needs to get removed if it has been added to | ||
// the wait queue of the Event. This has happened in the PollState::Waiting case. | ||
if let PollState::Waiting = wait_node.state { | ||
if !unsafe { self.waiters.remove(wait_node) } { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this method, we have exclusive mutable access to self.waiters
. Naively, I would expect to be able to remove from a list with a &mut
ref safely...i've not gotten to read the linked list implementation yet, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's indeed safe as long as the list is consistent. But the outside API of the list at this point just forwards the unsafe
annotations that are required internally. And since it mutates a raw pointer of non-owned elements, it is unsafe.
@@ -0,0 +1,628 @@ | |||
//! An intrusive double linked list of data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of amusing to note that if we merge this PR as-is, tokio would now contain at least 4 separate linked-list implementations. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct that this linked list is inherently not thread safe? This should probably be stated in this comment...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re 4 list implementations: Sounds indeed not ideal :-)
I confess I didn't perform an in-depth review on what is available up to now, since the main focus was to get scope
working. This one here was mostly copy/paste to enable the necessary synchronization primitives.
Re thread-safety: It indeed is not. I thought not being Sync
is indicator enough :-)
pub struct ScopedJoinHandle<'scope, T> { | ||
#[pin] | ||
handle: JoinHandle<Result<T, CancellableFutureError>>, | ||
phantom: core::marker::PhantomData<&'scope ()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that, since the phantomdata is unused, this should have a leading _
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed!
|
||
impl ScopeHandle { | ||
/// spawns a task on the scope | ||
pub fn spawn<'inner, T, R>(&'inner self, task: T) -> ScopedJoinHandle<'inner, R> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ergonomics reasons, I think we probably ought to have a free-fn spawn
(or spawn_scoped
if it's exported at the top level?) that's available inside the scope, similar to spawn_local
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The non-free function has an advantage: It also constrains the lifetime of the join handles to the ones of the ScopeHandle
. Since the ScopeHandle
doesn't allow to spawn outside of the scope, the ScopedJoinHandle
can thereby also never resolve to a cancelled variant. If we would make this a free function there could either be no current scope available, or the scope might already have been cancelled.
Another thing is that ScopeHandle
s can currently be cloned and stored in structs for spawning later - at least as long as that struct lives within the lifetime of a scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we would make this a free function there could either be no current scope available, or the scope might already have been cancelled.
I think it would be fine for a free function to panic in these cases; that's what the rest of tokio
does...
It also constrains the lifetime of the join handles to the ones of the
ScopeHandle
.
This, on the other hand, is a compelling reason to use the handle only. AFAICT, there isn't really any other way to constrain the scoped join handle lifetime. I think the ergonomics of a free function would be better, but I think the ability to return a JoinHandle
that only lives as long as the scope is super valuable, so this may be the best approach!
futures::executor::block_on(async { | ||
let _ = child_task.await; | ||
}); | ||
panic!("Spawn on cancelled Scope"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this panic will signal incorrect API use to a user, it would be nice if the panic message was a little more descriptive.
/// Whether tasks should be cancelled once the scope is exited | ||
pub cancel_behavior: ScopeCancelBehavior, | ||
/// How the scope should behave if it gets dropped instead of being `await`ed | ||
pub drop_behavior: ScopeDropBehavior, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid breaking changes, I think we should either add an empty private field here, so the struct has to be constructed like
ScopeOptions {
cancel_behavior: // whatever
drop_behavior: // whatever
..ScopeOptions::default(),
}
or replace the public fields with a builder. That way, adding new options isn't a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a builder is the way to go! One open question I had regarding this was whether the builder at the end should directly build the scope or just returns the Option
.
The first option would then look along:
ScopeBuilder::new()
.set_cancel_behavior(ScopeCancelBehavior::Panic)
.build(|scope| async move {
scope.spawn(...);
}).await;
I think that looks a bit heavy - maybe rather just build the Options
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, do you think there's a use-case for building multiple scopes with the same Options
?
} | ||
} | ||
|
||
/// Internal state of the `CancellationToken` pair above |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CancellationToken
is now below, not above, and is not a pair. Suggest you just make this
/// Internal state of the `CancellationToken`
// the current executor thread to make progress, due to dependening on | ||
// its IO handles. We need to do something along task::block_in_place | ||
// to solve this. | ||
futures::executor::block_on(wait_fut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internals are able to use our implementation of block_on
: https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/enter.rs#L83
I haven't done a detailed review yet, but we should aim to get this merged sooner than later flagged w/ a What if we move this to a top-level module: |
+1 for a flatter module structure (and, IMO, shortening the names by referring them to However, there is one minor issue exposing a top-level scope module: the free function |
|
||
/// A handle to the scope, which allows to spawn child tasks | ||
#[derive(Clone)] | ||
pub struct ScopeHandle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably could just be named Scope
?
pub async fn scope<F, Fut, R>(scope_func: F) -> R | ||
where | ||
F: FnOnce(ScopeHandle) -> Fut, | ||
Fut: Future<Output = R> + Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify why Send
is required here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed first batch of code. Only minor nits so far!
/// The function is only safe as long as valid pointers are stored inside | ||
/// the linked list. | ||
pub(crate) unsafe fn add_front(&mut self, item: *mut ListNode<T>) { | ||
assert!(!item.is_null(), "Can not add null pointers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either change to NonNull
, or this runtime assertion could be changed into a debug_assert!
and added as a Safety
invariant in documentation.
} | ||
|
||
/// Consumes the list and creates an iterator over the linked list. | ||
/// This function is only safe as long as all pointers which are stored inside |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice with these comments, but it would be nice if it (and others like it) followed rustdoc convention and lived under a # Safety
section.
// further side effects. | ||
|
||
let waiters = self.waiters.take(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document what assumptions are made here to make the unsafe
use sound. Some places you already have, but same for the ones you haven't. Even if they are trivial it makes them easier to review and maintain in case the assumptions change!
return false; | ||
} | ||
|
||
assert!(self.tail.is_null()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a debug assertion? Unless this has safety implications which could arise from safe use at runtime. If that's the case, the panic should be documented - especially how it could arise from misuse.
|
||
/// Removes the last item from the linked list and returns it | ||
#[allow(dead_code)] | ||
pub(crate) unsafe fn remove_last(&mut self) -> *mut ListNode<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't look like it has to be unsafe
. It has exclusive access, and all invariants are internally checked as far as I can see / understand. Otherwise, please document safety :D.
} | ||
|
||
#[cfg(test)] | ||
#[cfg(feature = "std")] // Tests make use of Vec at the moment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feature std
doesn't exist (yet), so can't run tests. I'm guessing probably just remove this?
} | ||
|
||
#[test] | ||
fn add_sorted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing function which doesn't exist add_sorted
, guessing copy-paste mistake so should probably be removed?
This is currently paused, and will be rebased on top of #2263 once ready. |
Any updates on this? |
Superseeded by #2576 |
This change adds
task::scope
as a mechanism for supportingstructured concurrency as described in #1879.
The change adds a
task::scope
function which will forcefully cancel allchild tasks when the scope is exited, as well as a
task::scope_with_options
function which allows to override the defaultcancellation and drop behavior.
The
scope
implementations makes use of 2 primitives:to cancel
Both primitives are implemented using mechansims and code from
futures-intrusive.
The current PR is work-in-progress and mainly up for discussions.
One thing that definitely needs to be changed is the dependency on
futures::executor::block_on
for dropping a scope. Besides that tests and docs are missing.This change implements
scope
in a way where a forced cancellation of child tasks is the default. However graceful cancellation is still possible if applications prefer the behavior. In order to achieve this the automatic cancellation can be disabled, and users can utilize their own cancellation tokens in order to perform a graceful cancellation.