Skip to content

Commit

Permalink
Merge pull request #34 from mtshiba/main
Browse files Browse the repository at this point in the history
Kill the subprocess when the client/server is dead
  • Loading branch information
cordx56 authored Feb 18, 2025
2 parents 1885c5c + f96560d commit 56c6bd2
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 16 deletions.
34 changes: 34 additions & 0 deletions rustowl/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions rustowl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ simple_logger = { version = "5.0.0", features = ["stderr"] }
tokio.workspace = true
tower-lsp = "0.20.0"
mktemp = "0.5.1"
process_alive = "0.1.1"

[target.'cfg(unix)'.dependencies]
libc = "0.2.169"

[[bin]]
name = "cargo-owl"
Expand Down
87 changes: 71 additions & 16 deletions rustowl/src/owlsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,16 @@ fn search_cargo(p: &PathBuf) -> Vec<PathBuf> {
res
}

type Subprocess = Option<u32>;

#[derive(Debug)]
struct Backend {
#[allow(unused)]
client: Client,
roots: Arc<RwLock<HashMap<PathBuf, PathBuf>>>,
analyzed: Arc<RwLock<Option<Workspace>>>,
processes: Arc<RwLock<JoinSet<()>>>,
subprocesses: Arc<RwLock<Vec<Subprocess>>>,
}

impl Backend {
Expand All @@ -604,6 +607,7 @@ impl Backend {
roots: Arc::new(RwLock::new(HashMap::new())),
analyzed: Arc::new(RwLock::new(None)),
processes: Arc::new(RwLock::new(JoinSet::new())),
subprocesses: Arc::new(RwLock::new(vec![])),
}
}
async fn set_roots(&self, uri: &lsp_types::Url) {
Expand All @@ -623,7 +627,18 @@ impl Backend {
}
}

async fn analzye(&self) {
async fn abort_subprocess(&self) {
while let Some(pid) = self.subprocesses.write().await.pop() {
if let Some(pid) = pid {
#[cfg(unix)]
unsafe {
libc::killpg(pid.try_into().unwrap(), libc::SIGTERM);
}
}
}
}

async fn analyze(&self) {
// wait for rust-analyzer
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
{
Expand All @@ -632,21 +647,30 @@ impl Backend {
let roots = { self.roots.read().await.clone() };
let mut join = self.processes.write().await;
join.shutdown().await;
self.abort_subprocess().await;
for (root, target) in roots {
let mut child = process::Command::new("rustup")
.arg("run")
.arg(TOOLCHAIN_VERSION)
.arg("cargo")
.arg("owl")
.arg(&root)
.arg(&target)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()
.unwrap();
let mut child = unsafe {
process::Command::new("rustup")
.arg("run")
.arg(TOOLCHAIN_VERSION)
.arg("cargo")
.arg("owl")
.arg(&root)
.arg(&target)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.kill_on_drop(true)
.pre_exec(|| {
#[cfg(unix)]
libc::setsid();
Ok(())
})
.spawn()
.unwrap()
};
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let analyzed = self.analyzed.clone();
tokio::spawn(async move {
join.spawn(async move {
while let Ok(Some(line)) = stdout.next_line().await {
if let Ok(ws) = serde_json::from_str::<Workspace>(&line) {
let write = &mut *analyzed.write().await;
Expand All @@ -658,9 +682,11 @@ impl Backend {
}
}
});
let pid = child.id();
join.spawn(async move {
let _ = child.wait().await;
});
self.subprocesses.write().await.push(pid);
}
}
async fn cleanup_targets(&self) {
Expand Down Expand Up @@ -721,6 +747,23 @@ impl Backend {
}
}

impl Drop for Backend {
fn drop(&mut self) {
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(err) => {
log::error!("failed to create async runtime for graceful shutdown: {err}");
return;
}
};
rt.block_on(async {
if let Err(err) = self.shutdown().await {
log::error!("failed to shutdown the server gracefully: {err}");
};
});
}
}

#[tower_lsp::async_trait]
impl LanguageServer for Backend {
async fn initialize(
Expand Down Expand Up @@ -753,13 +796,24 @@ impl LanguageServer for Backend {
capabilities: server_cap,
..Default::default()
};
let health_checker = async move {
if let Some(process_id) = params.process_id {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
if !process_alive::state(process_alive::Pid::from(process_id)).is_alive() {
panic!("The client process is dead");
}
}
}
};
tokio::spawn(health_checker);
Ok(init_res)
}
async fn initialized(&self, _p: lsp_types::InitializedParams) {
self.analzye().await;
self.analyze().await;
}
async fn did_save(&self, _params: lsp_types::DidSaveTextDocumentParams) {
self.analzye().await;
self.analyze().await;
}
async fn did_change(&self, _params: lsp_types::DidChangeTextDocumentParams) {
*self.analyzed.write().await = None;
Expand All @@ -773,12 +827,13 @@ impl LanguageServer for Backend {
for added in params.event.added {
self.set_roots(&added.uri).await;
}
self.analzye().await;
self.analyze().await;
}

async fn shutdown(&self) -> jsonrpc::Result<()> {
self.cleanup_targets().await;
self.processes.write().await.shutdown().await;
self.abort_subprocess().await;
Ok(())
}
}
Expand Down

0 comments on commit 56c6bd2

Please sign in to comment.