-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve executor ergonomics #26
Changes from all commits
ab3439e
6de6973
1507cbd
da0c3a9
cdb2774
a39909e
2e73947
a2fe8eb
5b4f4fc
6db555e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed under the | ||
// MIT/Apache-2.0 License, at your convenience | ||
// | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. | ||
// | ||
use futures::future::join_all; | ||
use scipio::{Local, LocalExecutor}; | ||
use std::io::Result; | ||
|
||
async fn hello() { | ||
let mut tasks = vec![]; | ||
for t in 0..5 { | ||
tasks.push(Local::local(async move { | ||
println!("{}: Hello {} ...", Local::id(), t); | ||
Local::later().await; | ||
println!("{}: ... {} World!", Local::id(), t); | ||
})); | ||
} | ||
join_all(tasks).await; | ||
} | ||
|
||
fn main() -> Result<()> { | ||
// There are two ways to create an executor, demonstrated in this example. | ||
// | ||
// We can create it in the current thread, and run it separately later... | ||
let ex = LocalExecutor::new(Some(0))?; | ||
|
||
// Or we can spawn a new thread with an executor inside. | ||
let handle = LocalExecutor::spawn_executor("hello", Some(1), || async move { | ||
hello().await; | ||
})?; | ||
|
||
// If you create the executor manually, you have to run it like so. | ||
// | ||
// spawn_new() is the preferred way to create an executor! | ||
ex.run(async move { | ||
hello().await; | ||
}); | ||
|
||
// The newly spawned executor runs on a thread, so we need to join on | ||
// its handle so we can wait for it to finish | ||
handle.join().unwrap(); | ||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,9 @@ use std::future::Future; | |
use std::io; | ||
use std::pin::Pin; | ||
use std::rc::Rc; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::task::{Context, Poll}; | ||
use std::thread::{Builder, JoinHandle}; | ||
use std::time::{Duration, Instant}; | ||
|
||
use futures_lite::pin; | ||
|
@@ -46,6 +48,8 @@ use crate::task::waker_fn::waker_fn; | |
use crate::Reactor; | ||
use crate::{IoRequirements, Latency}; | ||
|
||
static EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); | ||
|
||
#[derive(Debug, Clone)] | ||
/// Error thrown when a Task Queue is not found. | ||
pub struct QueueNotFoundError { | ||
|
@@ -304,9 +308,29 @@ pub struct LocalExecutor { | |
queues: Rc<RefCell<ExecutorQueues>>, | ||
parker: parking::Parker, | ||
binding: Option<usize>, | ||
id: usize, | ||
} | ||
|
||
impl LocalExecutor { | ||
fn init(&mut self) -> io::Result<()> { | ||
if let Some(cpu) = self.binding { | ||
bind_to_cpu(cpu)?; | ||
} | ||
|
||
let queues = self.queues.clone(); | ||
let index = 0; | ||
|
||
let io_requirements = IoRequirements::new(Latency::NotImportant, 0); | ||
self.queues.borrow_mut().available_executors.insert( | ||
0, | ||
TaskQueue::new("default", 1000, io_requirements, move || { | ||
let mut queues = queues.borrow_mut(); | ||
queues.maybe_activate(index); | ||
}), | ||
); | ||
Ok(()) | ||
} | ||
|
||
/// Creates a single-threaded executor, optionally bound to a specific CPU | ||
/// | ||
/// # Examples | ||
|
@@ -322,28 +346,79 @@ impl LocalExecutor { | |
/// ``` | ||
pub fn new(binding: Option<usize>) -> io::Result<LocalExecutor> { | ||
let p = parking::Parker::new(); | ||
let le = LocalExecutor { | ||
let mut le = LocalExecutor { | ||
queues: ExecutorQueues::new(), | ||
parker: p, | ||
binding, | ||
id: EXECUTOR_ID.fetch_add(1, Ordering::Relaxed), | ||
}; | ||
|
||
if let Some(cpu) = binding { | ||
bind_to_cpu(cpu)?; | ||
} | ||
le.init()?; | ||
Ok(le) | ||
} | ||
|
||
let queues = le.queues.clone(); | ||
let index = 0; | ||
/// Creates a single-threaded executor, optionally bound to a specific CPU, inside | ||
/// a newly craeted thread. The parameter `name` specifies the name of the thread. | ||
/// | ||
/// This is a more ergonomic way to create a thread and then run an executor inside it | ||
/// This function panics if creating the thread or the executor fails. If you need more | ||
/// fine-grained error handling consider initializing those entities manually. | ||
/// | ||
/// | ||
/// # Examples | ||
/// | ||
/// ``` | ||
/// use scipio::LocalExecutor; | ||
/// | ||
/// // executor is a single thread, but not bound to any particular CPU. | ||
/// let handle = LocalExecutor::spawn_executor("myname", None, || async move { | ||
/// println!("hello"); | ||
/// }).unwrap(); | ||
/// | ||
/// handle.join().unwrap(); | ||
/// ``` | ||
#[must_use = "This spawns an executor on a thread, so you must acquire its handle and then join() to keep it alive"] | ||
pub fn spawn_executor<G, F, T>( | ||
name: &'static str, | ||
binding: Option<usize>, | ||
fut_gen: G, | ||
) -> io::Result<JoinHandle<()>> | ||
where | ||
G: FnOnce() -> F + std::marker::Send + 'static, | ||
F: Future<Output = T> + 'static, | ||
{ | ||
let id = EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); | ||
|
||
Builder::new() | ||
.name(format!("{}-{}", name, id).to_string()) | ||
.spawn(move || { | ||
let mut le = LocalExecutor { | ||
queues: ExecutorQueues::new(), | ||
parker: parking::Parker::new(), | ||
binding, | ||
id, | ||
}; | ||
le.init().unwrap(); | ||
le.run(async move { | ||
let task = Task::local(async move { | ||
fut_gen().await; | ||
}); | ||
task.await; | ||
}) | ||
}) | ||
} | ||
|
||
let io_requirements = IoRequirements::new(Latency::NotImportant, 0); | ||
le.queues.borrow_mut().available_executors.insert( | ||
0, | ||
TaskQueue::new("default", 1000, io_requirements, move || { | ||
let mut queues = queues.borrow_mut(); | ||
queues.maybe_activate(index); | ||
}), | ||
); | ||
Ok(le) | ||
/// Returns a unique identifier for this Executor. | ||
/// | ||
/// # Examples | ||
/// ``` | ||
/// use scipio::LocalExecutor; | ||
/// | ||
/// let local_ex = LocalExecutor::new(None).expect("failed to create local executor"); | ||
/// println!("My ID: {}", local_ex.id()); | ||
/// ``` | ||
pub fn id(&self) -> usize { | ||
self.id | ||
glommer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/// Creates a task queue in the executor. | ||
|
@@ -717,6 +792,34 @@ impl<T> Task<T> { | |
} | ||
} | ||
|
||
/// Returns the id of the current executor | ||
/// | ||
/// If called from a [`LocalExecutor`], returns the id of the executor. | ||
/// | ||
/// Otherwise, this method panics. | ||
/// | ||
/// # Examples | ||
/// | ||
/// ``` | ||
/// use scipio::{LocalExecutor, Task}; | ||
/// | ||
/// let local_ex = LocalExecutor::new(None).expect("failed to create local executor"); | ||
/// | ||
/// local_ex.run(async { | ||
/// println!("my ID: {}", Task::<()>::id()); | ||
/// }); | ||
/// ``` | ||
pub fn id() -> usize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having I think we should have a way to access the local executor (ala There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that in Rust, you can only safely use thread local things inside a scope. You cannot have an |
||
where | ||
T: 'static, | ||
{ | ||
if LOCAL_EX.is_set() { | ||
LOCAL_EX.with(|local_ex| local_ex.id()) | ||
} else { | ||
panic!("`Task::id()` must be called from a `LocalExecutor`") | ||
} | ||
} | ||
|
||
/// Detaches the task to let it keep running in the background. | ||
/// | ||
/// # Examples | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exactly why making main() -> std::io::Result instead of expect() was a bad idea:
now the code has inconsistent error handling so that even main return type is useless because all other errors can't fit its type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, the fact Rust has many incompatible variants of Result<> type is painful. Maybe its worth having your own Result<> and return everywhere 1 type and make it compatible with at least 1 std (std::io ie).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only a problem (and apparently one known to the rust community) with
.join()
If you google this error message it should take you to places where the Rust folks tell you not to
?
on join.(although now that I am searching again I have been so far unable).
You will get similar results if you spawn a thread manually and then
join
it, so I am not too concerned.My understanding is that it would be possible to make this better and
?
on join if we implemented an error wrapper onio::Error
that is send, but I am not too concerned about that either: if a thread fails in an unrecoverable way (at least for us), you are likely to want to panic the whole thing anyway: otherwise you are left with executors that are running while others have failed.I feel like the example is good now: The user can see how to handle errors that happen at spawn time, and for asynchronous errors coming from threads we are recommending unwrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about the errors, but in practice it seems to me that everyone standardized on
io::Result
so as long as your errors are convertible to and from it, all should be goodThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To split the difference, this example could just cheat and use
Result<(), Box<dyn std::error::Error>>
like the Tokio documentation does: https://docs.rs/tokio/0.2.22/tokio/runtime/index.htmlThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Daniel - as we discussed, this won't work in practice because
JoinHandle
transforms the error into a boxedstd::any::Any
whose size is not known.I don't know of a very good way to
?
from a join, but this is a problem thatthread::spawn
has already. We can fix the example when we find a good way (I'd like to), but tabling for now so we can make progress