Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured Concurrency Support #1879

Closed
Matthias247 opened this issue Dec 2, 2019 · 50 comments
Closed

Structured Concurrency Support #1879

Matthias247 opened this issue Dec 2, 2019 · 50 comments

Comments

@Matthias247
Copy link
Contributor

Matthias247 commented Dec 2, 2019

First of all a disclaimer: This issue is not yet a full proposal. This serves more as a collection of things to explore, and to gather feedback on interest.

What is structured concurrency?

Structured concurrency describes programming paradigm. Concurrent tasks are structured in a fashion where there exist clean task hierarchies, and where the lifetime of all sub-tasks/child-tasks is constrained within the lifetime of their parent task.

The term was likely brought up first by Martin Sustrik in this blog post, and was a guiding idea behind the libdill library. @njsmith utilized the term in Notes on structured concurrency, or: Go statement considered harmful, and designed the python trio library around the paradigm. I highly recommend to read the blog post.

The paradigm has also been adopted by Kotlin coroutines. @elizarov gives a talk at HydraConf about structured concurrency and the evolution of Kotlins async task model, which I also highly recommend to watch. It provides some hints on things to look out for, and how APIs could look like. Kotlins documentation around coroutines is also a good resource.

Go adopted some support for structured concurrency with the errgroup package.

Benefits of structured concurrency

I again recommend to check out the linked resources, which also elaborate on this 😀

In short: Applying the structured concurrency paradigm can simplify reasoning about concurrent programs and thereby reduce errors. It is helpful at preventing resource leaks, in the same fashion as RAII allows to avoid leaks on a scope level. It might also allow for optimizations.

Examples around error reductions and simplifications

Here is one motivating example of how structured concurrency can simplify things:

We are building a building a web application A, which is intended to handle at least 1000 transactions per second. Internally each transaction requires a few concurrent interactions, which will involve reaching out to remote services. When one of those transactions fails, we need to perform certain actions. E.g. we need to call another service B for a cleanup or rollback. Without structured concurrency, we might have the idea just to do spawn(cleanup_task()) in order to do this. While this works, it has a side effect: cleanup tasks might still be running while the main webservice handler has already terminated. This sounds harmless at first, but can have surprising consequences: We obviously want our services to be resilient against overloads, so we limit the amount of concurrent requests to 2000 via an async Semaphore. This works fine for our main service handler. But what happens if lots of transactions are failing? How many cleanup tasks can run at the same point of time? The answer is unfortunately, that the number of those is effectively unbounded. Thereby our service can be overloaded through queuing up cleanup tasks - even though we protected ourself against too many concurrent API calls. This can lead to large scale outages in distributed systems.

By making sure all cleanup logic is performed inside the lifetime/scope of the main service handler, we can guarantee that the number of cleanup tasks is also bounded by our Semaphore.

Another example could be applying configuration changes at runtime: While our service is running we want to able to update it's configuration. After the configuration change is applied no transaction should still be utilizing the old configuration. What we need to do now is:

  • Disable the acceptor in order to drain requests before we can apply the config change
  • Wait for all ongoing transactions to complete
  • Cancel transactions if they take too long to complete.
  • Update the configuration
  • Restart the acceptor

Without having a structured approach for concurrency, this is a lot more of a complicated problem than it sounds. Any old transaction might have spawned a subtask which might still be executing after we have updated the configuration. There is no easy way to check for the higher level code if everything has finished.

Potential for optimizations

The application of structured concurrency might allow for optimizations. E.g. we might be able to allow subtasks to borrow data inside the parent tasks scope without the need for additional heap allocations. Since the exact mechanisms are however not yet designed, the exact potential is unknown.

Core requirements

I think the core requirements for structured concurrency are:

  • A parent task will only finish once all child tasks have finished
  • When tasks are spawned, they need to be spawned in the context of a parent task. The parent needs to remember it's child tasks
  • Parent tasks need to have a mechanism to cancel child tasks
  • Errors in child tasks should lead the parent task to return an error as soon as possible, and all sibling tasks to get cancelled. This behavior is equivalent to the behavior of the try_join! macro in futures-rs.

Regarding the last point I am not sure whether automatic error propagation is a required point of structured concurrency and whether it can be achieved on a task level, but it definitely makes things easier.

Do we actually need to have builtin support for this?

Rusts async/await mechanism already provides structured concurrency inside a particular task: By utilizing tools like select! or join! we can run multiple child-tasks which are constrained to the same lifetime - which is the current scope. This is not possible in Go or Kotlin - which require an explicit child-task to be spawned to achieve the behavior. Therefore the benefits might be lower.

I built an examples in futures-intrusive around those mechanisms.

However the concurrency inside a certain task will not scale very well, due requiring polling of all child Futures. Therefore real concurrent tasks will be required in most applications.

On this level we have a set of tools in our toolbox that allow us to structure our current tasks manually:

  • Parent tasks can wait for child tasks to join via the use of Oneshot channels or the new JoinHandles
  • Parent tasks can forcefully cancel child tasks by just dropping them
  • Parent tasks can gracefully cancel parents via passing a cancellation signal.

However these tools all require explicit code in order to guarantee correctness. Builtin support for structured concurrency could improve on usability and allow more developers to use good and correct defaults.

And as mentioned earlier, I think builtin support could also allow for new usages, e.g. borrowing inside child tasks or potential scheduler improvements when switching between child tasks and parent tasks.

The following posts are now mainly a braindump around how these requirements could be fulfilled and how they align with existing mechanisms.

@Matthias247
Copy link
Contributor Author

Terminology comparison Kotlin/Rust

Since there will be a few comparisons to Kotlin due to it's super interesting implementation of structured concurrency, here is a short terminology comparison of concepts for people not familiar with it:

  • Jobs in Kotlin are comparable to tasks in Rust. They both represent an independent concurrent unit of execution. Note all the states of a Job, incl. the cancellation states. These do not yet have a counterpart in Rust.
  • CoroutineScopes provide a scope/container for starting child tasks. I think the current equivalents we have are the Runtime as a top level scope, and LocalSet and FuturesUnordered as smaller scopes.
  • Kotlins equivalent of block_on, which is called runBlocking, also represents a CoroutineScope. It can spawn child-tasks, and it's configuration can be overrriden like in every other scope.
  • CoroutineContext and coroutineContext are similar to Rusts Context variable as well as the TLS state which is implicitly forwarded inside Tokio to enable interaction with the Tokio runtime.

@Matthias247
Copy link
Contributor Author

Waiting for child tasks to complete

As mentioned in the requirements, parent tasks should be able to wait for child tasks to complete.
We were up to now able to do this via oneshot channels. Tokio 0.2 introduced JoinHandles for this, which could do the same job in a more efficient fashion. I think these are already a great foundation.

I think some of the remaining problems to solve here are:

Waiting for child tasks to complete with an absence of explicit joins

Even if the user does not explicitely join the tasks, we need to wait until they have completed.
I think there are 3 ways to do this:

  1. When the JoinHandle gets dropped, cancel the child task. This is proposed in Consider making JoinHandle cancel on drop #1830
  2. We can add an explicit TaskGroup/TaskSet/TaskScope/CoroutineScope/Nursery type, which acts as a parent for all child tasks.
  3. The runtime can await all non-joined child tasks before it signals the parent task as finished/joinable.

The second approach reflects best what the other frameworks are doing. However I think those also have different constraints. They can not necessarily run as much implicit code as we can, due to not supporting destructors. Therefore the preferred approach might turn out different.

With the second approach we might have code like the following, which waits on a TaskGroup instead of a JoinHandle:

async fn parent_task() {
    TaskGroup::new(|group| async {
        group.spawn(async {
             doSth().await;
             group.cancel(); // Cancel the scope and all child tasks
        }
    }).await; // This waits for all child tasks to complete
}

with potential TaskGroup propagation via TLS. This looks also remarkably similar to what is already offered as FuturesUnordered in futures-rs, or the new tokio::task::LocalSet API. All those APIs provide mechanisms to manage subtasks. However their use-cases are slightly different, and ranging from just being able to manage child tasks more efficiently to being able to configure the dispatcher. It seems like Kotlins CoroutineScope might provide a superset of this functionality, by being able to set a default dispatcher as well as being able to provide a lifetime bound for concurrent operations.

The third approach actually seems similar, with the addition of that each Task is actually also a Group with parent tasks. That means each Task would need to store a list of child tasks. I think that can be supported with low overhead in Rust, since we can store the tasks as an intrusive list of child pointers.

Spawning and waiting for child tasks could be achieved by wrapping all spawned tasks in an async fn like this:

fn tokio::spawn(f: Future) {
    let task = Task::new(f);
    CURRENT_TASK.add_child(task);
    runtime.spawn(async {
        // Await the main task         
        t.future.await;
        // Await all child tasks that have not yet finished
        for child in task.child_tasks {
            child.await;
        }
        // Only signal the parent task as done, when all child tasks have completed
        t.join_handle.signal_done();
    }
}

Thereby the JoinHandle of a parent task would never be ready before all child tasks have completed.

The second and third approach have the benefit of being able to support async cancellation. There will be discussion about async cancellation in one of the next comments around cancellation.

They however also raise the question when a child task handles should get removed from the parent tasks list, if the parent task never returns (e.g. a servers accept task). Maybe when join on the tasks handle is explitly called?

The second approach seems like it separates the concerns of a task and a group of task better, while in the third approach they are the same thing. I definitely prefer the separation of concerns. However the third approach seems like it might be able to provide stronger guarantees of allowing tasks to run to completion, since there is no TaskGroup object which can get randomly dropped by users. I guess if we go for the TaskGroup, then dropping it might lead to force cancellation for the remaining tasks in it. Or we could still have TaskGroup, but it's an implicit member of each task, when then gets awaited by the runtime.

Error propagation

All approaches somehow raise the question on whether and how errors from child tasks can be propagated. In Rust we can not throw an arbitrary exception. In the second and third approach we might be able to add some constraints that all tasks need to return a Result<T, E>. Child tasks errors must be convertible into the parent tasks or TaskGroups error type via .into() - or they need to get boxed. I think with the TaskGroup type and the second approach error propagation might be easiest to support, since we have an explicit thing to .await. That thing can return the error.

When using the first approach we could likely only return a generic CancellationError if the child task gets dropped.

Requirements for supporting erroring task

In order to make this work, we need to extend Tasks from having no return types (and thereby JoinHandle returning ()) to supporting return values (incl Result). I think this would be a good change to have anyway: A lot of code requires to return something from a task and currently uses Oneshot channels for it. Direct support for return values coud likely be more efficient.

@njsmith
Copy link

njsmith commented Dec 2, 2019

Some notes:

I think Go's errgroup actually predates that blog post. (As do Erlang's supervisors, Dijkstra's parbegin/parend, rayon::scope etc., and I think Martin Sústrik was the first to use the phrase "structured concurrency".) I think the basic concepts are fundamental enough that people keep rediscovering them from different directions. The novel contributions in that post were to draw a line around the concepts, argue that we should try to systematically eliminate unstructured primitives, and suggest the nursery concept as a structured primitive that might be flexible enough to do that.

There's a long discussion of Rust/futures/structured concurrency here: https://trio.discourse.group/t/structured-concurrency-in-rust/73. I don't know if any of it's directly relevant, but you might find it interesting at least.

I think the core requirements for structured concurrency are:

I'd say the core requirement is: a function can't spawn tasks that outlive the function's lifetime, unless explicitly granted that permission. That's the point of reifying nurseries. So you want to be careful here:

A parent task will only finish once all child tasks have finished

I'm not an expert on tokio, but in most systems, a "task" is a larger unit of work than a single function frame. So if a function can freely spawn a child task whose lifetime is bounded to the parent task, then that can violate the structured concurrency rule, because the task lifetime is larger than the lifetime of the function inside it.

@Matthias247
Copy link
Contributor Author

Cancelling child tasks

Rusts lives the in the luxury situation that we can perform cancellation in a couple of ways:

  1. Forceful cancellation: Parent tasks could simply drop their child tasks
  2. Graceful/cooperative cancellation: Child tasks get signaled that they are cancelled. The signal allows them to continue to run, but every method which is aware about the cancellation signal will return early with an cancellation error, which leads the task to finish. In order to support graceful cancellation we can e.g. utililize a datastructure similar to ManualResetEvent in futures-intrusive, which can be signaled a single time and is asynchronously awaitable by an unlimited amount of tasks.

Graceful cancellation has some benefits:

  • Child task can detect their cancellation and execute some potentially async cleanup logic, that is only known to them. That is similiar to the AsyncDrop concept, but code can just be written using normal async/await syntax instead of in a handwritten statemachine form. This will not guarantee that code will run to completion, and can thereby not be used for 100% deterministic cleanup. But that limitation is fundamental to the Futures model, and can also not be changed with the proposed AsyncDrop. Both mechanisms just provide a mechanism to execute cleanup code as long as no higher level component performs a hard cancellation.
  • Child tasks have the chance to explicitly return an Err variant of their choice, which we could later on utilize as the result of our parent task. With forced cancellation, we can not return more specialized errors.
  • It might be less prone to accidental errors for code which has not taken cancellation on arbitrary await points correctly into account. This might of course still happen in other places where combinators drop suspended Futures, but if the runtime doesn't automatically force-cancel tasks it would be less situations.

It obviously also has the drawback that cancellation signals might not be respected, or not being acted on in a timely manner.

I personally favor graceful cancellation.

In order to support graceful cancellation a CancellationToken could be stored in each task that is accessible inside the tasks body. The token could either be passed explicitely as parameter, or implicitly be available as a TLS variable.

The explicit version could look like:

async fn subtask(cancel_token: &CancellationToken) {
    select! {
        result = execute_other_logic() => {},
        _ = cancel_token => {
            // Subtask is being cancelled, but can still continue to execute code
    }
}

spawn(subtask);

The implicit version would get the CancellationToken e.g. via tokio::current_task().cancel_token, and could select! on that. Here again both versions have their pro's and con's.

With the implicit version we can directly add listen to cancellation signals in low level tokio types like sockets and timers, and thereby basically guarantee that cancellation is respected - even if the user would not explicitly forward the parameter. However it's obviously more magic, and might make it harder to standardize the mechanism.

When cancelling tasks we might have situations where we only want to cancel one particular subtask (e.g. as discussed in #1830), or one where we want to cancel all subtasks - e.g. because our server is shutting down. Both mechanisms should be efficient - which ideally means cancelling N child-tasks should not require to actively inititiate N cancellations internally by iterating through the list. I have some idea how to allow for this, but that's more for the details.

@Matthias247
Copy link
Contributor Author

Managing child/parent relations

As mentioned earlier, a list of child tasks can likely be stored by having an [intrusive] list of child tasks in the parent, or the TaskGroup. In order to retrieve information about the parent, we can

  • either provide it to each child task as a parameter on startup and require users to propgate the information
  • or allow to access task information via TLS. The runtime will populate the required TLS fields. This is somewhat equivalent to Kotlins coroutineContext.

@kellytk
Copy link
Contributor

kellytk commented Dec 3, 2019

Would the functionality provided by https://github.com/bastion-rs/bastion (https://docs.rs/bastion/0.3.1/bastion/) qualify as structured concurrency?

@Matthias247
Copy link
Contributor Author

@kellytk

Would the functionality provided by https://github.com/bastion-rs/bastion (https://docs.rs/bastion/0.3.1/bastion/) qualify as structured concurrency?

Maybe :) As a contributor you might be able to answer this question best. I just skimmed the docs and I'm not yet sure whether I fully figured out what it provides. The child hierarchies are certainly something similar. But it also seems very much inspired by an actor model - and what Tokio would offer is likely more general.

@Matthias247
Copy link
Contributor Author

I didn't found too much time to work on this so far. But here are just some further thoughts so far:

Waiting for child tasks to complete

I don't think anymore that

Option 3: The runtime can await all non-joined child tasks before it signals the parent task as finished/joinable.

is something to persuit. One thing that I don't like is that it doesn't allow for multiple "scopes" inside a Task, within which we want to constrain concurrency.

E.g. we couldn't do along

async fn some_method() {
    while !done {
        SCOPE_START
        // spawn child tasks here
        SCOPE_END
        // all child tasks are guaranteed to have finished here
    }
});

since the scope is automatically the task.

It also makes it hard to pass scope handles around, since the task is not a value. It could only be addressed via functions which act on thread-locals, which are better avoided.

WaitSet

Therefore I now think

Option2 We can add an explicit TaskGroup/TaskSet/TaskScope/CoroutineScope/Nursery type, which acts as a parent for all child tasks.

is the preferred approach, which I will investigate further. @carllerche proposed calling it WaitSet, so I will just refer it to it that way in all further text.

An examplary usage was already provided earlier:

async fn parent_task() {
    WaitSet::new(|wait_set| async {
        let join_handle = wait_set.spawn(async {
             doSth().await;
        }
    }).await; // This waits for all child tasks to complete
}

This is the most basic example. Creating a WaitSet will create a scope, inside which tasks can be started. No body of a child task can outlive the WaitSet, because awaiting the WaitSet will wait for the tasks to complete. Users will however have the ability to await the child tasks inside the scope - e.g. if they are interested in the return value.

The JoinHandle return by WaitSet::spawn would need to have a lifetime of 'wait_set. It can not outlive the Set or moved out of it.

What else can we win

Before going deeper into some implementation questions, here is something that already this most simple version of the WaitSet could enable:

It could allow borrowing of values on the parent task inside subtasks in the WaitSet, in a similar fashion as scoped threads (e.g. in crossbeam)

async fn parent_task() {
    let data = [0u8; 1024];
    WaitSet::new(|wait_set| async {
        for i in 0 .. 10 {
            wait_set.spawn(async {
                 write_data_to_receiver(i, &data[..]).await
            }
        }
    }).await; // This waits for all child tasks to complete
}

since the child tasks would have a lifetime which is smaller than the lifetime of WaitSet.
This is obviously already possible with select! and join!, but those will not scale to as many tasks as spawning of real child tasks do, and they do not support adding new child tasks of heterogenuos nature dynamically.

What happens if the JoinHandle would get dropped

There are a couple of options here:

  1. Force-stop the task by dropping the Future that it wraps
  2. Allow the task to continue to run. It is still tracked by the WaitSet and will automatically be awaited when the WaitSet is awaited.
  3. Signal a graceful cancellation request to the child task, and let it continue to run. It will also be awaited when the WaitSet is awaited. However due to the graceful cancellation request it might finish earlier.

Option 1 on the first sight doesn't look too desirable, because we want to try to offer a solution which tries to favor mechanisms for graceful cancellation instead of forced cancellation.

It was mentioned that some users might find it surprising that their task automatically stops if they don't store the handle. It is a common novice error in some other frameworks e.g. here for boost asio to forget storing some handles, when then leasds to operations automatically cancelling.
On the other hand it was also mentioned that #[must_use] can help preventing this error in Rust to a certain extend.

Option 2 is closest to current tokio::spawn.

It avoids the "surprising cancellation" issue. And obviously trades it in for the fact that an invisible task continues to run. However the task is still tracked by the WaitSet.

For tasks where the user is never interested in the return value Option 2 is certainly the most ergonomic option. E.g. we could imagine that a WaitSet is created when a certain Http/WebSocket/etc connection is opened, and inside it a task is created which logs some metrics every couple of seconds. There is no real benefit on waiting for it - it doesn't have a result. We just want to make sure it's done and doesn't waste resources anymore when the connection is closed.

Option3 will require a builtin mechanism to signal a graceful cancellation request to a single task (instead of all tasks in the WaitSet) - which we might or might not need for other reasons. If other parts of the WaitSet also prefer graceful cancellation, doing the same here might make things more consistent. However I think Option 3 could be even more surprising than Option 1. For the latter users would encounter the issue immediately and can fix it. With Option 3 they might only see it once they add any component which actually respects the cancellation request and terminates the task.

So far I am preferring Option 2: Allow the task to continue to run for these reasons. But I think we have to look at a few more things before making a decision.. We will look at further extensions of the WaitSet, where child tasks can return Results and a failure in one child-task would need to lead to a graceful cancellation and error report of other tasks. The ideas here somehow should fit into that.

What happens if the WaitSet would get dropped

Since the WaitSet is just an an object instance on a callstack in an async fn and thereby Future, it can be dropped while it is still executing the child tasks inside it. Since bare Futures require synchronous cancellation, we are restricted to synchronous operations in WaitSet::drop() when an async fn that makes use of it gets forcefully cancelled.

I think there are only 2 things we can do here which do not violate the structured concurrency principle:

  • Synchronously wait (block) for all remaining tasks to finish. This would block the scheduler. If one of the awaited child tasks would drop yet another WaitSet the problem gets even bigger - even though it might still be feasible due to at least having an acyclic task graph. But I think it isn't feasible to gurantee good behavior in a general purpose framework like tokio and is therefore not a real option.
  • Forcefully drop/cancel all remaining child tasks in the WaitSet. This has the downside that spawned tasks might now need to support 2 cancelation mechanisms (forceful and graceful). But on the other hand "good" async code will need to support forceful cancellation anyway.

Therefore I think we need to go with the latter option and drop all remaining tasks. Many applications might not need to handle this perfectly fine, since they will have control over all their tasks and can avoid WaitSets getting droped. In a similar fashion like they can try to avoid panics.

@Matthias247
Copy link
Contributor Author

I just had a discussion with @carllerche whether borrowing from the parent task like in the following code is safe:

async fn parent_task() {
    let data = [0u8; 1024];
    WaitSet::new(|wait_set| async {
        for i in 0 .. 10 {
            wait_set.spawn(async {
                 write_data_to_receiver(i, &data[..]).await
            }
        }
    }).await; // This waits for all child tasks to complete
}

The concern here is that due to the use of mem::forget the child task could continue to run and access invalid memory, while the parent task has already completed.

In my understanding the mechanism is safe due to the following reasons:

  • Data which gets borrowed is in on an async "stack". Therefore it's part of a Future. That Future needs to have been pinned before the WaitSet code is executed. Otherwise the code inside the WaitSet scope would not start to run.
  • Whoever pinned the code did promise not to deallocate that Futures memory without calling drop on it before. This is described in the Drop guarantee of Pin.
  • If drop is called on the Future that represents the parent task, the drop method of the WaitSet would be called. This is required to stop all pending tasks by a mechanism described in Structured Concurrency Support #1879 (comment)
  • There exists no mechanism to forget the the WaitSet. The code inside it either gets started and runs till await or drop, or it is never started. Even if a user does
    let data = [0u8; 1024];
    let wait_set = WaitSet::new(|wait_set| {
        wait_set.spawn(async {
            write_data_to_receiver(&data[..]).await
        });
    });
    std::mem::forget(wait_set)
    nothing bad would happen, since the child tasks would not yet get started without the .await. Basically we would need to guarantee that the WaitSet::new creates Future, and everything in the scope is only executed when that Future is guaranteed to be pinned.

I am not so sure about a WaitSet without the mandatory scope. E.g.

let data = [0u8; 1024];

let wait_set = WaitSet::new();
wait_set.spawn(async {
    write_data_to_receiver(&data[..]).await
});
std::mem::forget(wait_set)

If the spawn here is immedately executed, then this does not seem to be safe. And if would not immediately executed, the API would not be too helpful, since it would then just be an accumulator for tasks which get started later.

As far as I understand it also wouldn't be safe if spawn would use Pin<&mut self>as receiver instead of&mut self`, because the following still seems invalid to me:

let wait_set = WaitSet::new();
{
    let data = [0u8; 1024];
    pin_mut!(wait_set);
    wait_set.spawn(async {
        write_data_to_receiver(&data[..]).await
    });
    std::mem::forget(wait_set)
}

But maybe one of our pinning experts (@Nemo157, @RalfJung) could chime in and correct me on everything I've written :)

@najamelan
Copy link
Contributor

I am very happy to see discussion about this, but I would love for this not to be considered a tokio issue. This concerns the whole rust async ecosystem and if some sort of solution comes forth, it would be nice to not lock people into using tokio in order to benefit from it. I would love to see solutions which are executor agnostic.

@carllerche
Copy link
Member

@najamelan I believe that an implementation would require access to runtime internals

@Matthias247
Copy link
Contributor Author

(Error) return values

This is mostly based on the last comments, and assumes we will start with a WaitSet or scope like this:

async fn parent_task() {
    let result = task::scope(|scope| async {
        let join_handle = scope.spawn(async {
             doSth().await;
        };
        let join_handle_2 = scope.spawn(async {
             doSth().await;
        };

        join_handle_2.await   
    }).await; // This waits for all child tasks to complete
}

Based on this assumption, the remaining questions to clarify are:

  • How do we cancel child tasks we are no longer interested in? E.g. in this example the task identified by join_handle is is not explicitly awaited. But structured concurrency expects us to wait until it finished before the scope exits. So how can we make this happen?
  • Should there be any inbuilt handling of Results, with the following notion: If one of the child tasks fails with an Err
    • The remaining tasks should be cancelled
    • The result of the complete scope would be this error
  • Should we force to spawn tasks which return Result?
  • What would happen if last statement already completed with no error, the scope is exited, but then one of the other non awaited tasks yielded an error.

I will start answering the last question first, because that was the one where I found my personal answer first: I think do not we should silently exchange return values outside of the users view. This would be very surprising, and typically not what the user wants. Instead of this, if the block inside the scope finishes

  • we just return the return value from the block as typical
  • we signal cancellation to any outstanding tasks and await them. However we do not care any longer about their results.

After having thought about this, I continued to wonder whether we can still allow the following use-case:

  • A user spawns a number of subtasks. E.g. the want to upload a file to N servers, and use a task for each
  • If any of the uploads fails, the operation is failed. The result of the scope is a failure, and the outstanding tasks can be cancelled.

It then actually occurred to me that this works actually quite naturally due to how ? works in Rust:

  • If the user uses .await? on a Future<Output=Result<T, Err>>, then the error will automatically become the error return value of the scope
  • AND since ? triggers an early return from the scope, the scope would automatically cancel all remaining outstanding tasks and just wait for them to complete before the error is forwarded to the .awaiter of the scope.

This means I think we would need no special handling for Results. Users are just able to spawn child tasks - either using Result as return parameter which enables them to short-circuit using ? or any other return value (which would need to be manually handled by the user`.

So as as a more complete example of the upload files task, I expect users to be able to do:

let result = task::scope(|scope| async {
    let upload_task_handle_1 = scope.spawn(async {
            doUpload().await;
    };
    let upload_task_handle_2 = scope.spawn(async {
            doUpload().await;
    };

    try_join!(upload_task_handle_1, upload_task_handle_2)
}).await?;

Careful readers will notice the code looks pretty much the same as if users would have utilized a try_join! block which performs all uploads in the same task. But here we are actually using multiple tasks to do the job, which avoids unnecessary polling of unrelated tasks.

The tasks will also feature a graceful cancellation facility, which is part of the next post.

@Matthias247
Copy link
Contributor Author

Cancellation

So the last piece of the puzzle is cancellation. Here there are 2 main questions to answer

  1. Should tasks be individually cancellable, or is it sufficient if we cancel all tasks within a scope at once?
  2. Do we want to offer graceful cancellation, forced cancellation, or both?

Scope cancellation vs individual task cancellation

The approach as i outlined it so far has actually no real need for offering per-task cancellation. It is sufficient if all remaining tasks get cancelled as soon as the return from the scope happens. Therefore a per-task cancellation would be an additional feature.

Benefits of per-task cancellation:

It makes the cancellation path more consistent. If we offer graceful cancellation (via some kind of CancellationToken) for scope exits, it would feel wrong to offer only forced cancellation inside the scope. Forced cancellation would even not work if we do not cancel the task by dropping the JoinHandle - which was my preferred option as outlined in #1879 (comment)

Drawbacks of per-task cancellation

We would need to track cancellation state per task. At the end of a scope we would likely need to iterate over all pending tasks in order to signal them the cancellation request. Compared to this cancelling a shared CancellationToken would be an O(1) operation.

I personally think we should therefore offer per-task cancellation. It's likely we can also discover a way that allows to cancel N tasks without without iteration over all of them, e.g. with some CancellationToken which takes an individual source into account as well as a group source. But that could be a follow-up optimization.

Graceful cancellation vs forced cancellation

I think we should offer graceful cancellation, because it's a superset of functionality, and users could still perform forced cancellation by using an adapter like

task::scope(|scope|{
    scope.spawn(|cancel_token| async {
        select! {
            _ = cancel_token => { /** return cancellation error */ },
           result = actual_async_fn() => result,
        }
    }
}

In this case the Future inside the task would just get force-dropped if a graceful cancellation is signalled - so the result is the same. However users can also continue to run a bit of code - e.g. in order to execute cleanup logic.

tokio can also offer a helper function like scope.spawn_with_forced_drop(future) which does the same by default if users do not care.

The main drawback of graceful cancellation that I see is that it's not well integrated into the other code - e.g. not on Tokio sockets and co. If a user ignores a cancellation request, the socket will not automatically abort, and the connection might stick around for a while even if the task was already cancelled a while ago. So this is an additional source of errors.

But on the other hand we could also call it a feature, since a cancelled task would not automatically lead to everything being abondened at any IO point, but to tasks only being exited at points that have been explicitly designated by the application eveloper.

@jonhoo
Copy link
Contributor

jonhoo commented Dec 18, 2019

I actually played around a decent amount with graceful termination over in https://docs.rs/stream-cancel/, though specifically in the context of streams. There was no consideration there of something akin to AsyncDrop. The select! you have here I encapsulated in the Valved combinator, and that's probably something Tokio could also provide.

@pandaman64
Copy link

I'm concerned about the possibility of the Leakpocalypse:

let mut t = Box::pin(task::scope(|scope| {
    // spawn a task
    scope.spawn(async { ... });
    // do other stuff
}));
futures::poll!(t.as_mut()); // the spawned task starts running
mem::forget(t);

Since we can forget the result of it without dropping nor canceling the passed future, the spawned tasks might outlive the parent.

@carllerche
Copy link
Member

@pandaman64 I still need to read the entire thing in depth, but I believe you are correct. The proposed API would not support borrowing in a spawn and I don't know of an obvious way to make it work w/o blocking the thread owning the task.

@carllerche
Copy link
Member

That said, I believe that we could support borrowed spawns from a blocking context. So, from outside of the runtime (or from a spawn_blocking) you could enter a scope and spawn N tasks that borrow.

@Matthias247
Copy link
Contributor Author

@jonhoo Providing wrappers that handle graceful cancellation is definitely an Option! The nice thing about select! ing is that it will work with any kind of Future or Stream in any situation. But the ability to encapsulate the behavior into types sounds good. And we could certainly offer both.

@pandaman64

I'm concerned about the possibility of the Leakpocalypse:

I think especially a graceful cancellation mechanism has some potential to lead to unexpected leaks, and we need to trade it off. But I also think we can maybe add runtime diagnostic mechanisms to discover those spots. In a similar fashion as it might be possible to use those to detect accidentally blocking tasks. But that's for another discussion.

I'm not concerned about your given example leaking, because

  • using mem::forget on any non-primitive type basically requests to leak something
  • it's not different from creating a Future, polling it once and forgetting it. Or from spawning a thread which doesn't terminate itself and forgetting about it

Since we can forget the result of it without dropping nor canceling the passed future, the spawned tasks might outlive the parent.

Indeed it can in this case! Which is a bit of a bummer. I guess we can agree this is as a contrived example (why would you ever do this?), and that it's actually questionable whether there is a parent task at all in this case (you just kick off something via poll once, which is mostly decoupled from any task state). But we should still make sure it is fully safe.

Based on my understand I do see the structured concurrency concept violation. But not yet a potential memory issue that @carllerche is concerned about. I think the child tasks here are only allowed to capture data with a lifetime of that outlives the 'scope. Since you put the scope in a Box, it's lifetime get's 'static and captured data would need to be too. But we might need to get some more confirmation for this.

I was initially wondering whether the violation could be fixed by requiring that a child scope always borrows its parent scope (which would require us to provide a scope from block_on and other runtime entry methods). That way at least the lifetime of the created Box future would be limited to the lifetime of the parent. But I guess that doesn't yet prevent anyone from forgetting it.

@pandaman64
Copy link

Yeah, it's a contrived example. Typically users will await the scoped future (or at least drop it), so it's not an issue.
As for the soundness, I think if the spawned task can refer to the variables in the parent scope (which crossbeam::scope allows), then the leakage will lead to a use after free.

@pandaman64
Copy link

pandaman64 commented Dec 20, 2019

I made a prototype based on crossbeam::scope (except drop and exception handling, which requires task cancellation).
My idea is that we expose only scope! macro, which awaits the returned future immediately. In this way, the task is "blocked" until scope! ends.

Edit: I agree that scope_impl should be unsafe (it's already hidden in doc I think?). Committed a fix.

@Matthias247
Copy link
Contributor Author

Matthias247 commented Dec 20, 2019

@pandaman64 Nice work! Thanks for the quick prototyping.

The macro might definitely be an option if it doesn't work out otherwise. I think macros still require that the functions they refer to are public - but we could at least we could make the implementation unsafe and hide it.

Edit: The following comment was wrong. The program indeed compiles and allows to use parent data then the scope gets boxed and forgotten.

Fixed code which runs the child task:

{
    let data = vec![1u32, 2, 3];
    let data_ref = &data;
    
    {
        let mut scope_fut = Box::pin(scope_impl(handle.clone(), |scope| async move {
            scope.spawn(async move {
                println!("Hello from 2nd scope");
                println!("Data: {:?}", data_ref);
                delay_for(Duration::from_millis(500)).await;
                println!("End from 2nd scope");
                println!("Data: {:?}", data_ref);
            });
            5u32
        }));
        futures::poll!(scope_fut.as_mut());
        std::mem::forget(scope_fut);
    }

    println!("Dead scope");
}

However I'm not sure whether it's actually needed, or whether the lifetimes do not already fulfill the job. If I add the following to your example it leads to a compilation error:

let data = vec![1, 2, 3];
let data_ref = &data;

let mut scope_fut = Box::pin(scope_impl(handle.clone(), |scope| async move {
    scope.spawn(async {
        println!("Hello from 2nd scope");
        println!("Data: {:?}", data_ref);
        delay_for(Duration::from_millis(500)).await;
        println!("End from 2nd scope");
        println!("Data: {:?}", data_ref);
    });
    5u32
}));

futures::poll!(scope_fut.as_mut());
std::mem::forget(scope_fut);

yields the following error:

error[E0597]: `data_ref` does not live long enough
   --> examples/scoped.rs:149:44
    |
146 |                let mut scope_fut = Box::pin(scope_impl(handle.clone(), |scope| async move {
    |                                                                         ----- lifetime `'1` appears in the type of `scope`
147 |                    scope.spawn(async {
    |   _________________-_________________-
    |  |_________________|
    | ||
148 | ||                     println!("Hello from 2nd scope");
149 | ||                     println!("Data: {:?}", data_ref);
    | ||                                            ^^^^^^^^ borrowed value does not live long enough
150 | ||                     delay_for(Duration::from_millis(500)).await;
151 | ||                     println!("End from 2nd scope");
152 | ||                     println!("Data: {:?}", data_ref);
153 | ||                 });
    | ||_________________-- argument requires that `data_ref` is borrowed for `'1`
    |  |_________________|
    |                    value captured here by generator
154 |                    5u32
155 |                }));
    |                - `data_ref` dropped here while still borrowed

Full code

This seems to be what we want? Or is there yet another loophole in there?

@udoprog
Copy link
Contributor

udoprog commented Dec 20, 2019

@pandaman64 I had a similar idea to yours to support spawn_blocking with scoped references.

It would be built on top of the existing spawning API, with the following additions (and the added ability to cancel tasks):

unsafe fn spawn_scoped<'a, F, R>(func: F) -> Handle<'a, R>
where
    F: FnOnce() -> R + Send + 'a;

macro_rules! spawn_scoped {
    ($func:expr) => { unsafe { spawn_scoped($func) }.await }
}

The snags I could think of (and I believe they apply to yours as well) is that the Drop implementation for Handle would have to either cancel (as you mentioned above), or join the task if running to maintain its safety invariants. Which would cause potential blocking if embedded into another future dropping it.

Edit:
Did an incomplete and unsound PoC here just to see if the API would work.
It includes a breaking API change for simplicity, cancellation is not implemented, and all of the spawn API would need to be marked unsafe which it isn't right now:
udoprog@ba7f91c
And here's testing it, with some documentation on safety:
udoprog@spawn-scoped

Not sure if this approach could be made fully sound (edit: especially in the face of a safe men::forget). And there is still the issue of needing to block in the destruction of JoinHandle if it's running.

@pandaman64
Copy link

Yeah, the scoped API must wait for all spawned tasks to reach a cancellation point (when poll returns). So my scope! may block in an async context in the case of panic and Drop, too.

@Matthias247
Copy link
Contributor Author

Matthias247 commented Dec 22, 2019

@pandaman64
After reading the comment here I am no longer sure whether the macro can prevent the issue, since it can be "mitigated" by just wrapping everything in another async block:

let data = vec![1u32, 2, 3];
let data_ref = &data;

let mut scope_fut = Box::pin(async {
    scope!(handle.clone(), |scope| async move {
        scope.spawn(async move {
            // Invalid access
            println!("Data: {:?}", data_ref);
        });
        5u32
    })});
futures::poll!(scope_fut.as_mut());
std::mem::forget(scope_fut);

I think here the borrowing lifetime would need to be restricted to not be able to borrow things outside of Box::pin. But I'm not sure whether that is feasible. I was also a bit wondering whether Box::pin is actually allowing too much here, and whether it would be good to have it restricted to not box items which are 'static. But given it is just a combination of Box::new and Pin::new that would be hard to argue.

Proposals on what we can do regarding cancellation (force-cancel or block the thread) are already higher up in the thread. I'm not too worried about those in the moment.

The whole mem::forget story however seems super-sad, because it still seems a misuse of this feature (which is more than unlikely to ever happen in practice and even by accident) could prevent a really nice and safe API.

@udoprog I'm not sure if I understand it correctly. But isn't this even closer to the old scoped threads API in the synchronous world, where the problem was even bigger - since people could leak or forget the JoinHandles?

@rnarubin
Copy link

rnarubin commented Mar 5, 2020

Regarding borrowing data: it sounds to me like Pin provides exactly the guarantee needed, that if the pinned memory is invalidated then drop must have been called. The problem is that the child tasks must be stopped by the drop associated with the pinned and borrowed data

    let data = vec![1, 2, 3];
|-> let data_ref = &data; 
|
|   let mut scope_fut = task::scope(|scope| async move {
|     scope.spawn(async {
|-----> foobar(data_ref)
|     });
|   });
|
|   futures::poll!(scope_fut.as_mut());
|-> std::mem::forget(scope_fut);
|
| doesn't work because we can't require that scope_fut is dropped with the pinned memory

But there is a future here that we do have control over, and can add to its drop body: the scope future itself. Given that we take ownership of the closure and store it in the scope future's "stack", it should be possible for the child tasks to borrow and share data from within the closure:

    let mut scope_fut = task::scope(|scope| async move {
      let data = vec![1, 2, 3];
|---> let data_ref = &data; 
|
|     scope.spawn(async {
|-----> foobar(data_ref)
|     });
|
|     scope.spawn(async {
|-----> bizbuz(data_ref)
|     });
|   });
|
|   futures::poll!(scope_fut.as_mut());
|-> std::mem::forget(scope_fut);
|
| forgetting the future is fine, because the data is leaked so the children can run as long as they want

The children will be stopped by the drop call that also owns the referenced data, so the tasks can't outlive their references. It's also fine to never drop the future, in which case the data is leaked so references to it are still valid. This isn't a totally satisfying answer in that we can't borrow from the actual parent task. But it is a useful step in the right direction, at least the children can share borrowed data while running in parallel.

Regarding forced cancellation: I suspect there won't be a great solution without async destructors

@Darksonn
Copy link
Contributor

Darksonn commented Mar 5, 2020

forgetting the future is fine, because the data is leaked so the children can run as long as they want

I mean, you can't actually forget a non-Unpin pinned future with std::mem::forget because it takes the future by value, thus calling it is a move.

Matthias247 added a commit to Matthias247/tokio that referenced this issue May 31, 2020
This change adds task::scope as a mechanism for supporting
structured concurrency as described in tokio-rs#1879.

In this version of the change scopes can be created using a freestanding
`task::scope` function, which creates a new detached scope (which is not
coupled to the parent scope), `Scope::detached` and `Scope::with_parent`,
and a `::child()` method on an existing scope handle.

Since a few of of those methods are doing the same thing, some of those
might get dropped before the change is merged. For future extensibility
also a `Scope::with_config` and `ScopeConfigBuilder` are demonstrated in
this change. Those might also be part of the initial change.
Matthias247 added a commit to Matthias247/tokio that referenced this issue May 31, 2020
This change adds task::scope as a mechanism for supporting
structured concurrency as described in tokio-rs#1879.

In this version of the change scopes can be created using a freestanding
`task::scope` function, which creates a new detached scope (which is not
coupled to the parent scope), `Scope::detached` and `Scope::with_parent`,
and a `::child()` method on an existing scope handle.

Since a few of of those methods are doing the same thing, some of those
might get dropped before the change is merged. For future extensibility
also a `Scope::with_config` and `ScopeConfigBuilder` are demonstrated in
this change. Those might also be part of the initial change.
Matthias247 added a commit to Matthias247/tokio that referenced this issue Jun 2, 2020
This change adds task::scope as a mechanism for supporting
structured concurrency as described in tokio-rs#1879.

This version of the scope implementation makes use of implicit scopes,
which are propgated within the task system through task local storage.

Ever task spawned via `scope::spawn` or `scope::spawn_cancellable` is
automatically attached to it's current scope without having to
explicitly attach to it. This provides stronger guarantees, since child
tasks in this model will never be able to outlive the parent - there is
no `ScopeHandle` available to spawn a task on a certain scope after this
is finished.

One drawback of this approach is however that since no `ScopeHandle` is
available, we also can't tie the lifetime of tasks and their
`JoinHandle`s to this scope. This makes it less likely that we could
borrowing data from the parent task using this approach.

One benefit however is that there seems to be an interesting migration
path from tokios current task system to this scoped approach:
- Using `tokio::spawn` could in the future be equivalent to spawning
  a task on the runtimes implicit top level scope. The task would not
  be force-cancellable, in the same fashion as tasks spawned via
  `scope::spawn` are not cancellable.
- Shutting down the runtime could be equivalent to leaving a scope: The
  remaining running tasks get a graceful cancellation signal and the
  scope would wait for those tasks to finish.
- However since the Runtime would never have to force-cancel a task
  (people would opt into this behavior using `scope::spawn_cancellable`)
  the `JoinError` could be removed from the "normal" spawn API. It is
  still available for cancellable spawns.
Matthias247 added a commit to Matthias247/tokio that referenced this issue Jun 2, 2020
This change adds task::scope as a mechanism for supporting
structured concurrency as described in tokio-rs#1879.

This version of the scope implementation makes use of implicit scopes,
which are propgated within the task system through task local storage.

Ever task spawned via `scope::spawn` or `scope::spawn_cancellable` is
automatically attached to it's current scope without having to
explicitly attach to it. This provides stronger guarantees, since child
tasks in this model will never be able to outlive the parent - there is
no `ScopeHandle` available to spawn a task on a certain scope after this
is finished.

One drawback of this approach is however that since no `ScopeHandle` is
available, we also can't tie the lifetime of tasks and their
`JoinHandle`s to this scope. This makes it less likely that we could
borrowing data from the parent task using this approach.

One benefit however is that there seems to be an interesting migration
path from tokios current task system to this scoped approach:
- Using `tokio::spawn` could in the future be equivalent to spawning
  a task on the runtimes implicit top level scope. The task would not
  be force-cancellable, in the same fashion as tasks spawned via
  `scope::spawn` are not cancellable.
- Shutting down the runtime could be equivalent to leaving a scope: The
  remaining running tasks get a graceful cancellation signal and the
  scope would wait for those tasks to finish.
- However since the Runtime would never have to force-cancel a task
  (people would opt into this behavior using `scope::spawn_cancellable`)
  the `JoinError` could be removed from the "normal" spawn API. It is
  still available for cancellable spawns.
@carllerche
Copy link
Member

I'm going to close this now as there is no way forward. More details here #2596 (comment)

Thanks again for all the work you did on this. Even though we didn't manage to get this through, I personally learned a lot on the topic and from your research. Also, once again, I'm sorry that didn't put more time into this sooner.

@najamelan
Copy link
Contributor

najamelan commented Jul 24, 2020

If people are interested in structured concurrency and can live without a borrowing scope, check out async_nursery. Disclaimer: I wrote that. If you are specifically after borrowing, you could check out async-scoped.

mrkline added a commit to mrkline/backpak that referenced this issue Dec 27, 2021
Tokio - and Rust's async model in general - is pretty freaking cool, but
it isn't a perfect fit for everything. After hammering for a few days,
I'm pretty confident that it's not working out here:

- There's no way to enforce scoped async tasks without blocking the
  current thread.[1][2][3] This means that there's no async task
  equivalent to Rayon/Crossbeam-like scopes, and you're back to arcs and
  pins and all sorts of fun boilerplate if you'd like to foster
  parallelism with task::spawn().

- Traits and recursive calls need lots o' boxes, implemented by proc
  macros at best and by hand at worst.

- Since many FS syscalls block, tokio asyncifies them by wrapping each
  in a spawn_blocking(), which spawns a dedicated thread. Of course you
  can wrap chunks of synchronous file I/O in spawn_blocking() if kicking
  off a separate thread for each File::open() call doesn't sound fun,
  but that means you can't interact with async code anywhere inside...

- Add in the usual sprinkling of async/await/join/etc. throughout the
  code base - since anything that awaits needs to be a future itself,
  async code has a habit of bubbling up the stack.

None of these are dealbreakers by themselves, but they can add up to
real overheads. Not just cognitive, but performance too, especially if
you've already got a design with concurrent tasks that do a decent job
of saturating I/O and farming CPU-heavy work out to a thread pool.
< gestures around >

[1]: tokio-rs/tokio#1879
[2]: tokio-rs/tokio#2596
[3]: https://docs.rs/async-scoped/latest/async_scoped/struct.Scope.html#safety-3
@Skepfyr
Copy link

Skepfyr commented Sep 11, 2022

I think this is possible in a way that hasn't been discussed yet, although it does have tradeoffs.
If there was a tokio::scope(|s| s.spawn(...)) style function that blocked the thread, there's nothing to say that the thread couldn't do something useful while it was blocked. For example it could do something like Runtime::block_on, essentially restarting the runtime from this point on the stack.
The major downside is that this makes it easier to blow the stack, if too many scoped spawns stacked on top of one another I suspect you could quite easily fill up your stack. However I've got no idea how much of a problem that would be in practice.

@Darksonn
Copy link
Contributor

That also has the issue that it breaks any types of in-task concurrency such as tokio::join!, join_all or FuturesUnordered.

@Skepfyr
Copy link

Skepfyr commented Sep 11, 2022

Sorry I don't quite understand, are you talking about a situation where a Task joins two tasks that both call scope? ... that does seem like it would be broken.

@Darksonn
Copy link
Contributor

Even if only one branch of a tokio::join! calls scope, then it would still break as the other branches would be unable to run during the call to scope.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests