From e066c9f8f44c7e971af37bf6388d1368571455b0 Mon Sep 17 00:00:00 2001 From: Zhang Tianyang Date: Fri, 19 Jan 2024 18:35:35 +0800 Subject: [PATCH 1/3] task: report more OCI runtime errors Signed-off-by: Zhang Tianyang --- vmm/task/src/container.rs | 184 ++++++++++++++++++++++++++++++-------- 1 file changed, 149 insertions(+), 35 deletions(-) diff --git a/vmm/task/src/container.rs b/vmm/task/src/container.rs index 8ac89465..37194343 100644 --- a/vmm/task/src/container.rs +++ b/vmm/task/src/container.rs @@ -15,7 +15,8 @@ limitations under the License. */ use std::{ - convert::TryFrom, os::unix::prelude::ExitStatusExt, path::Path, process::ExitStatus, sync::Arc, + convert::TryFrom, io::SeekFrom, os::unix::prelude::ExitStatusExt, path::Path, + process::ExitStatus, sync::Arc, }; use async_trait::async_trait; @@ -48,7 +49,7 @@ use runc::{options::GlobalOpts, Runc, Spawner}; use serde::Deserialize; use tokio::{ fs::File, - io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}, + io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader}, process::Command, sync::Mutex, }; @@ -245,28 +246,67 @@ impl KuasarFactory { } // runtime_error will read the OCI runtime logfile retrieving OCI runtime error -pub async fn runtime_error(bundle: &str, e: runc::error::Error, msg: &str) -> Error { +pub async fn runtime_error(bundle: &str, r_err: runc::error::Error, msg: &str) -> Error { + match get_last_runtime_error(bundle).await { + Err(e) => other!( + "{}: unable to retrieve OCI runtime error ({}): {}", + msg, + e, + r_err + ), + Ok(rt_msg) => { + if rt_msg.is_empty() { + other!("{}: empty msg in log file: {}", msg, r_err) + } else { + other!("{}: {}", msg, rt_msg) + } + } + } +} + +async fn get_last_runtime_error(bundle: &str) -> Result { + let log_path = Path::new(bundle).join("log.json"); let mut rt_msg = String::new(); - match File::open(Path::new(bundle).join("log.json")).await { - Err(err) => other!("{}: unable to open OCI runtime log file){}", msg, err), + match File::open(log_path).await { + Err(e) => Err(other!("unable to open OCI runtime log file: {}", e)), Ok(file) => { - let mut lines = BufReader::new(file).lines(); - while let Ok(Some(line)) = lines.next_line().await { - // Retrieve the last runtime error - match serde_json::from_str::(&line) { - Err(err) => return other!("{}: unable to parse log msg: {}", msg, err), - Ok(log) => { - if log.level == "error" { - rt_msg = log.msg.trim().to_string(); + let mut reader = BufReader::new(file); + let file_size = reader + .seek(SeekFrom::End(0)) + .await + .map_err(other_error!(e, "error seek from start"))?; + + let mut pre_buffer: Option> = None; + let mut buffer = Vec::new(); + + for offset in (0..file_size).rev() { + if offset == 0 { + break; + } + reader + .seek(SeekFrom::Start(offset)) + .await + .map_err(other_error!(e, "error seek"))?; + let result = reader + .read_until(b'\n', &mut buffer) + .await + .map_err(other_error!(e, "reading from cursor fail"))?; + if result == 1 && pre_buffer.is_some() { + let line = String::from_utf8_lossy(&pre_buffer.unwrap()).into_owned(); + match serde_json::from_str::(&line) { + Err(e) => return Err(other!("unable to parse log msg({}): {}", line, e)), + Ok(log) => { + if log.level == "error" { + rt_msg = log.msg.trim().to_string(); + break; + } } } } + pre_buffer = Some(buffer.clone()); + buffer.clear(); } - if !rt_msg.is_empty() { - other!("{}: {}", msg, rt_msg) - } else { - other!("{}: (no OCI runtime error in logfile) {}", msg, e) - } + Ok(rt_msg) } } } @@ -305,10 +345,9 @@ impl ProcessFactory for KuasarExecFactory { #[async_trait] impl ProcessLifecycle for KuasarInitLifecycle { async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> { - self.runtime - .start(p.id.as_str()) - .await - .map_err(other_error!(e, "failed start"))?; + if let Err(e) = self.runtime.start(p.id.as_str()).await { + return Err(runtime_error(&p.lifecycle.bundle, e, "OCI runtime start failed").await); + } p.state = Status::RUNNING; Ok(()) } @@ -319,31 +358,37 @@ impl ProcessLifecycle for KuasarInitLifecycle { signal: u32, all: bool, ) -> containerd_shim::Result<()> { - self.runtime + if let Err(r_err) = self + .runtime .kill( p.id.as_str(), signal, Some(&runc::options::KillOpts { all }), ) .await - .map_err(|e| check_kill_error(e.to_string())) + { + let e = runtime_error(&p.lifecycle.bundle, r_err, "OCI runtime kill failed").await; + + return Err(check_kill_error(e.to_string())); + } + Ok(()) } async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> { - self.runtime + if let Err(e) = self + .runtime .delete( p.id.as_str(), Some(&runc::options::DeleteOpts { force: true }), ) .await - .or_else(|e| { - if !e.to_string().to_lowercase().contains("does not exist") { - Err(e) - } else { - Ok(()) - } - }) - .map_err(other_error!(e, "failed delete"))?; + { + if !e.to_string().to_lowercase().contains("does not exist") { + return Err( + runtime_error(&p.lifecycle.bundle, e, "OCI runtime delete failed").await, + ); + } + } self.exit_signal.signal(); Ok(()) } @@ -416,7 +461,8 @@ impl KuasarInitLifecycle { impl ProcessLifecycle for KuasarExecLifecycle { async fn start(&self, p: &mut ExecProcess) -> containerd_shim::Result<()> { rescan_pci_bus().await?; - let pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", &p.id)); + let bundle = self.bundle.to_string(); + let pid_path = Path::new(&bundle).join(format!("{}.pid", &p.id)); let mut exec_opts = runc::options::ExecOpts { io: None, pid_file: Some(pid_path.to_owned()), @@ -441,7 +487,7 @@ impl ProcessLifecycle for KuasarExecLifecycle { if let Some(s) = socket { s.clean().await; } - return Err(other!("failed to start runc exec: {}", e)); + return Err(runtime_error(&bundle, e, "OCI runtime exec failed").await); } copy_io_or_console(p, socket, pio, p.lifecycle.exit_signal.clone()).await?; let pid = read_file_to_str(pid_path).await?.parse::()?; @@ -613,3 +659,71 @@ pub fn check_kill_error(emsg: String) -> Error { other!("unknown error after kill {}", emsg) } } + +#[cfg(test)] +mod tests { + use std::path::Path; + + use containerd_shim::util::{mkdir, write_str_to_file}; + use tokio::fs::remove_dir_all; + + use crate::container::runtime_error; + + #[tokio::test] + async fn test_runtime_error_with_logfile() { + let empty_err = runc::error::Error::NotFound; + let log_json = "\ + {\"level\":\"info\",\"msg\":\"hello word\",\"time\":\"2022-11-25\"}\n\ + {\"level\":\"error\",\"msg\":\"failed error\",\"time\":\"2022-11-26\"}\n\ + {\"level\":\"error\",\"msg\":\"panic\",\"time\":\"2022-11-27\"}\n\ + {\"level\":\"info\",\"msg\":\"program exit\",\"time\":\"2024-1-24\"}\n\ + "; + let test_dir = "/tmp/kuasar-test_runtime_error_with_logfile"; + let _ = mkdir(test_dir, 0o711).await; + let test_log_file = Path::new(test_dir).join("log.json"); + write_str_to_file(test_log_file.as_path(), log_json) + .await + .expect("write log json should not be error"); + + let expected_msg = "panic"; + let actual_err = runtime_error( + test_dir, + empty_err, + "test_runtime_error_with_logfile failed", + ) + .await; + remove_dir_all(test_dir).await.expect("remove test dir"); + assert!( + actual_err.to_string().contains(expected_msg), + "actual error \"{}\" should contains \"{}\"", + actual_err.to_string(), + expected_msg + ); + } + + #[tokio::test] + async fn test_runtime_error_without_logfile() { + let empty_err = runc::error::Error::NotFound; + let test_dir = "/tmp/kuasar-test_runtime_error_without_logfile"; + let _ = remove_dir_all(test_dir).await; + assert!( + !Path::new(test_dir).exists(), + "{} should not exist", + test_dir + ); + + let expected_msg = "Unable to locate the runc"; + let actual_err = runtime_error( + test_dir, + empty_err, + "test_runtime_error_without_logfile failed", + ) + .await; + assert!( + actual_err.to_string().contains(expected_msg), + "actual error \"{}\" should contains \"{}\"", + actual_err.to_string(), + expected_msg + ); + } +} From 0b8d74e0181675d3a02d9b05090efb29d16c2aed Mon Sep 17 00:00:00 2001 From: Zhang Tianyang Date: Fri, 19 Jan 2024 18:39:42 +0800 Subject: [PATCH 2/3] task: ipmlement close_io Signed-off-by: Zhang Tianyang --- vmm/sandbox/Cargo.lock | 8 ++++---- vmm/task/Cargo.lock | 8 ++++---- vmm/task/src/container.rs | 1 + vmm/task/src/io.rs | 17 +++++++++++++++-- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/vmm/sandbox/Cargo.lock b/vmm/sandbox/Cargo.lock index dc21f0ff..5a461b8d 100644 --- a/vmm/sandbox/Cargo.lock +++ b/vmm/sandbox/Cargo.lock @@ -426,7 +426,7 @@ dependencies = [ [[package]] name = "containerd-sandbox" version = "0.1.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#57a1d4b87050d32b76ec51bf1c99cfddd74558f1" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "anyhow", "async-stream", @@ -454,7 +454,7 @@ dependencies = [ [[package]] name = "containerd-shim" version = "0.3.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#57a1d4b87050d32b76ec51bf1c99cfddd74558f1" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "async-trait", "cgroups-rs 0.2.11", @@ -485,7 +485,7 @@ dependencies = [ [[package]] name = "containerd-shim-protos" version = "0.2.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#57a1d4b87050d32b76ec51bf1c99cfddd74558f1" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "async-trait", "protobuf 3.2.0", @@ -1561,7 +1561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "059a34f111a9dee2ce1ac2826a68b24601c4298cfeb1a587c3cb493d5ab46f52" dependencies = [ "libc", - "nix 0.26.2", + "nix 0.20.0", ] [[package]] diff --git a/vmm/task/Cargo.lock b/vmm/task/Cargo.lock index 61ff1245..26d1c7cc 100644 --- a/vmm/task/Cargo.lock +++ b/vmm/task/Cargo.lock @@ -198,7 +198,7 @@ dependencies = [ [[package]] name = "containerd-sandbox" version = "0.1.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#6ae99540b754cd28c5389d5d6fdeff6ec7290ec5" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "anyhow", "async-stream", @@ -226,7 +226,7 @@ dependencies = [ [[package]] name = "containerd-shim" version = "0.3.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#6ae99540b754cd28c5389d5d6fdeff6ec7290ec5" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "async-trait", "cgroups-rs", @@ -257,7 +257,7 @@ dependencies = [ [[package]] name = "containerd-shim-protos" version = "0.2.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#6ae99540b754cd28c5389d5d6fdeff6ec7290ec5" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "async-trait", "protobuf 3.2.0", @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "runc" version = "0.2.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#6ae99540b754cd28c5389d5d6fdeff6ec7290ec5" +source = "git+https://github.com/kuasar-io/rust-extensions.git#e915494234c1586902f09f8d322dd2937e916144" dependencies = [ "async-trait", "futures", diff --git a/vmm/task/src/container.rs b/vmm/task/src/container.rs index 37194343..e3c1edbb 100644 --- a/vmm/task/src/container.rs +++ b/vmm/task/src/container.rs @@ -338,6 +338,7 @@ impl ProcessFactory for KuasarExecFactory { spec: p, exit_signal: Default::default(), }), + stdin: Arc::new(Mutex::new(None)), }) } } diff --git a/vmm/task/src/io.rs b/vmm/task/src/io.rs index e62be195..f943e8db 100644 --- a/vmm/task/src/io.rs +++ b/vmm/task/src/io.rs @@ -127,7 +127,7 @@ pub(crate) async fn copy_io_or_console

( ) -> Result<()> { if p.stdio.terminal { if let Some(console_socket) = socket { - let console_result = copy_console(&console_socket, &p.stdio, exit_signal).await; + let console_result = copy_console(p, &console_socket, &p.stdio, exit_signal).await; console_socket.clean().await; match console_result { Ok(c) => { @@ -144,7 +144,8 @@ pub(crate) async fn copy_io_or_console

( Ok(()) } -async fn copy_console( +async fn copy_console

( + p: &ProcessTemplate

, console_socket: &ConsoleSocket, stdio: &Stdio, exit_signal: Arc, @@ -155,6 +156,18 @@ async fn copy_console( let f = unsafe { File::from_raw_fd(fd) }; if !stdio.stdin.is_empty() { debug!("copy_console: pipe stdin to console"); + + let stdin_clone = stdio.stdin.clone(); + let stdin_w = p.stdin.clone(); + // open the write side to make sure read side unblock, as open write side + // will block too, open it in another thread + tokio::spawn(async move { + if let Ok(stdin_file) = OpenOptions::new().write(true).open(stdin_clone).await { + let mut lock_guard = stdin_w.lock().await; + *lock_guard = Some(stdin_file); + } + }); + let console_stdin = f .try_clone() .await From 256a492e3e24bd1ed51cfb29441e877d160c17d0 Mon Sep 17 00:00:00 2001 From: Zhang Tianyang Date: Wed, 24 Jan 2024 12:42:12 +0800 Subject: [PATCH 3/3] task: fix leaking files of exec process Signed-off-by: Zhang Tianyang --- vmm/task/src/container.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vmm/task/src/container.rs b/vmm/task/src/container.rs index e3c1edbb..0499affb 100644 --- a/vmm/task/src/container.rs +++ b/vmm/task/src/container.rs @@ -48,7 +48,7 @@ use oci_spec::runtime::{LinuxResources, Process, Spec}; use runc::{options::GlobalOpts, Runc, Spawner}; use serde::Deserialize; use tokio::{ - fs::File, + fs::{remove_file, File}, io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader}, process::Command, sync::Mutex, @@ -519,8 +519,10 @@ impl ProcessLifecycle for KuasarExecLifecycle { } } - async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> { + async fn delete(&self, p: &mut ExecProcess) -> Result<()> { self.exit_signal.signal(); + let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id)); + remove_file(exec_pid_path).await.unwrap_or_default(); Ok(()) }