Skip to content

Commit

Permalink
Use bounded queue when checks run concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaReiser committed Apr 27, 2024
1 parent a87387c commit 3ea16cb
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion crates/red_knot/src/program/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::db::SourceDb;
use crate::files::FileId;
use crate::lint::Diagnostics;
use crate::program::Program;
use rayon::max_num_threads;
use rustc_hash::FxHashSet;
use std::num::NonZeroUsize;

impl Program {
/// Checks all open files in the workspace and its dependencies.
Expand Down Expand Up @@ -61,6 +63,11 @@ pub trait CheckScheduler {
///
/// The implementation should call [`CheckFileTask::run`] to execute the check.
fn check_file(&self, file_task: CheckFileTask);

/// The maximum number of checks that can be run concurrently.
///
/// Returns `None` if the checks run on the current thread (no concurrency).
fn max_concurrency(&self) -> Option<NonZeroUsize>;
}

/// Scheduler that runs checks on a rayon thread pool.
Expand Down Expand Up @@ -88,6 +95,10 @@ where
self.scope
.spawn(move |_| child_span.in_scope(|| check_file_task.run(program)));
}

fn max_concurrency(&self) -> Option<NonZeroUsize> {
Some(NonZeroUsize::new(max_num_threads()).unwrap_or(NonZeroUsize::MIN))
}
}

/// Scheduler that runs all checks on the current thread.
Expand All @@ -108,6 +119,10 @@ impl CheckScheduler for SameThreadCheckScheduler<'_> {
fn check_file(&self, task: CheckFileTask) {
task.run(self.program)
}

fn max_concurrency(&self) -> Option<NonZeroUsize> {
None
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -195,7 +210,13 @@ impl<'a> CheckFilesLoop<'a> {
}

fn run(mut self, files: impl Iterator<Item = FileId>) -> Result<Vec<String>, CheckError> {
let (sender, receiver) = crossbeam_channel::unbounded();
let (sender, receiver) = if let Some(max_concurrency) = self.scheduler.max_concurrency() {
crossbeam_channel::bounded(max_concurrency.get())
} else {
// The checks run on the current thread. That means it is necessary to store all messages
// or we risk deadlocking when the main loop never gets a chance to read the messages.
crossbeam_channel::unbounded()
};

let context = CheckContext::new(self.cancellation_token.clone(), sender.clone());

Expand Down

0 comments on commit 3ea16cb

Please sign in to comment.