Skip to content

Commit

Permalink
Use task specific rayon threadpools and not the global threadpool (#…
Browse files Browse the repository at this point in the history
…1872)

The global `rayon` threadpool has a variable number of threads:

- An environment variable can be used to specify the number:
RAYON_NUM_THREADS
 - If not set, The number of CPUs on the executing system is used

Relying on using the global pool may lead to a situation where the
global threadpool does not have enough threads for `rover dev` to
execute correctly. It's not realistic to expect customers to know how to
set this environment variable and a better option is available.

At each point in `rover` where we use `rayon`, create a thread pool
which is task specific and explicitly sets the number of threads
available to the pool.

For example:

```
    let tp = rayon::ThreadPoolBuilder::new()
        .num_threads(1)
        .thread_name(|idx| format!("router-runner-{idx}"))
        .build()
        .map_err(|err| {
            RoverError::new(anyhow!("could not create router runner thread pool: {err}",))
        })?;
```
This has the added advantage of allowing us to provide a name for all
the threads created by the pool which is helpful debugging information.

Note: It might be that a more comprehensive answer to this problem is to
not use `rayon` at all, but that's a bigger change than I want to
consider for fixing this specific issue.

also:
- change the configuration of the `interprocess` crate so that
non-blocking isn't available (it's broken)
 - close `stdin` on our spawned router to reduce risk of bad input
- handle errors if we can't take `stdout` || `stderr` from our spawned
router

I've tested these changes manually on my laptop by setting
RAYON_NUM_THREADS=2 and using `rover dev` without observing any hangs.
Without these changes, `rover dev` hangs immediately.

fixes: #1871
  • Loading branch information
garypen authored Mar 13, 2024
1 parent df24356 commit 7bc235e
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 51 deletions.
22 changes: 0 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ heck = "0.4"
humantime = "2.1.0"
httpmock = "0.6"
hyper = "0.14"
interprocess = "1"
interprocess = { version = "1", default-features = false }
indoc = "2"
lazycell = "1"
lazy_static = "1.4"
Expand Down
8 changes: 7 additions & 1 deletion crates/rover-std/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,13 @@ impl Fs {
P: AsRef<Utf8Path>,
{
let path = path.as_ref().to_string();
rayon::spawn(move || {
// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|idx| format!("file-watcher-{idx}"))
.build()
.expect("thread pool built successfully");
tp.spawn(move || {
eprintln!("{}watching {} for changes", Emoji::Watch, &path);

let (fs_tx, fs_rx) = channel();
Expand Down
12 changes: 10 additions & 2 deletions src/command/dev/do_dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ impl Dev {
let leader_channel = LeaderChannel::new();
let follower_channel = FollowerChannel::new();

// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|idx| format!("router-do-dev-{idx}"))
.build()
.map_err(|err| {
RoverError::new(anyhow!("could not create router do dev thread pool: {err}",))
})?;
if let Some(mut leader_session) = LeaderSession::new(
override_install_path,
&client_config,
Expand All @@ -48,7 +56,7 @@ impl Dev {
leader_channel.receiver,
);

rayon::spawn(move || {
tp.spawn(move || {
ctrlc::set_handler(move || {
eprintln!(
"\n{}shutting down the `rover dev` session and all attached processes...",
Expand Down Expand Up @@ -120,7 +128,7 @@ impl Dev {

// start the interprocess socket health check in the background
let health_messenger = follower_messenger.clone();
rayon::spawn(move || {
tp.spawn(move || {
let _ = health_messenger.health_check().map_err(|_| {
eprintln!("{}shutting down...", Emoji::Stop);
std::process::exit(1);
Expand Down
10 changes: 9 additions & 1 deletion src/command/dev/protocol/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,15 @@ impl LeaderSession {

let follower_message_sender = self.follower_channel.sender.clone();
let leader_message_receiver = self.leader_channel.receiver.clone();
rayon::spawn(move || {
// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|idx| format!("router-leader-{idx}"))
.build()
.map_err(|err| {
RoverError::new(anyhow!("could not create router leader thread pool: {err}",))
})?;
tp.spawn(move || {
listener
.incoming()
.filter_map(handle_socket_error)
Expand Down
63 changes: 42 additions & 21 deletions src/command/dev/router/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl BackgroundTask {
command.args(args).env("APOLLO_ROVER", "true");

command.stdout(Stdio::piped()).stderr(Stdio::piped());
command.stdin(Stdio::null());

if let Ok(apollo_graph_ref) = var("APOLLO_GRAPH_REF") {
command.env("APOLLO_GRAPH_REF", apollo_graph_ref);
Expand Down Expand Up @@ -81,31 +82,51 @@ impl BackgroundTask {
.spawn()
.with_context(|| "could not spawn child process")?;

if let Some(stdout) = child.stdout.take() {
let log_sender = log_sender.clone();
rayon::spawn(move || {
let stdout = BufReader::new(stdout);
stdout.lines().for_each(|line| {
if let Ok(line) = line {
log_sender
.send(BackgroundTaskLog::Stdout(line))
.expect("could not update stdout logs for command");
}
// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.thread_name(|idx| format!("router-command-{idx}"))
.build()
.map_err(|err| {
RoverError::new(anyhow!(
"could not create router command thread pool: {err}",
))
})?;
match child.stdout.take() {
Some(stdout) => {
let log_sender = log_sender.clone();
tp.spawn(move || {
let stdout = BufReader::new(stdout);
stdout.lines().for_each(|line| {
if let Ok(line) = line {
log_sender
.send(BackgroundTaskLog::Stdout(line))
.expect("could not update stdout logs for command");
}
});
});
});
}
None => {
return Err(anyhow!("Could not take stdout from spawned router").into());
}
}

if let Some(stderr) = child.stderr.take() {
rayon::spawn(move || {
let stderr = BufReader::new(stderr);
stderr.lines().for_each(|line| {
if let Ok(line) = line {
log_sender
.send(BackgroundTaskLog::Stderr(line))
.expect("could not update stderr logs for command");
}
match child.stderr.take() {
Some(stderr) => {
tp.spawn(move || {
let stderr = BufReader::new(stderr);
stderr.lines().for_each(|line| {
if let Ok(line) = line {
log_sender
.send(BackgroundTaskLog::Stderr(line))
.expect("could not update stderr logs for command");
}
});
});
});
}
None => {
return Err(anyhow!("Could not take stderr from spawned router").into());
}
}

Ok(Self { child, descriptor })
Expand Down
18 changes: 16 additions & 2 deletions src/command/dev/router/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,15 @@ impl RouterConfigHandler {
// if a router config was passed, start watching it in the background for changes

if let Some(state_receiver) = self.config_reader.watch() {
rayon::spawn(move || loop {
// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|idx| format!("router-config-{idx}"))
.build()
.map_err(|err| {
RoverError::new(anyhow!("could not create router config thread pool: {err}",))
})?;
tp.spawn(move || loop {
let config_state = state_receiver
.recv()
.expect("could not watch router config");
Expand Down Expand Up @@ -271,7 +279,13 @@ impl RouterConfigReader {
let (raw_tx, raw_rx) = unbounded();
let (state_tx, state_rx) = unbounded();
Fs::watch_file(input_config_path, raw_tx);
rayon::spawn(move || loop {
// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|idx| format!("router-config-reader-{idx}"))
.build()
.ok()?;
tp.spawn(move || loop {
raw_rx
.recv()
.expect("could not watch router configuration file");
Expand Down
10 changes: 9 additions & 1 deletion src/command/dev/router/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,15 @@ impl RouterRunner {
let warn_prefix = Style::WarningPrefix.paint("WARN:");
let error_prefix = Style::ErrorPrefix.paint("ERROR:");
let unknown_prefix = Style::ErrorPrefix.paint("UNKNOWN:");
rayon::spawn(move || loop {
// Build a Rayon Thread pool
let tp = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|idx| format!("router-runner-{idx}"))
.build()
.map_err(|err| {
RoverError::new(anyhow!("could not create router runner thread pool: {err}",))
})?;
tp.spawn(move || loop {
while let Ok(log) = router_log_receiver.recv() {
match log {
BackgroundTaskLog::Stdout(stdout) => {
Expand Down

0 comments on commit 7bc235e

Please sign in to comment.