Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: some bugfixs #112

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vmm/sandbox/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions vmm/task/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

191 changes: 154 additions & 37 deletions vmm/task/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ limitations under the License.
*/

use std::{
convert::TryFrom, os::unix::prelude::ExitStatusExt, path::Path, process::ExitStatus, sync::Arc,
convert::TryFrom, io::SeekFrom, os::unix::prelude::ExitStatusExt, path::Path,
process::ExitStatus, sync::Arc,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -47,8 +48,8 @@ use oci_spec::runtime::{LinuxResources, Process, Spec};
use runc::{options::GlobalOpts, Runc, Spawner};
use serde::Deserialize;
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader},
fs::{remove_file, File},
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader},
process::Command,
sync::Mutex,
};
Expand Down Expand Up @@ -245,28 +246,67 @@ impl KuasarFactory {
}

// runtime_error will read the OCI runtime logfile retrieving OCI runtime error
pub async fn runtime_error(bundle: &str, e: runc::error::Error, msg: &str) -> Error {
pub async fn runtime_error(bundle: &str, r_err: runc::error::Error, msg: &str) -> Error {
match get_last_runtime_error(bundle).await {
Err(e) => other!(
"{}: unable to retrieve OCI runtime error ({}): {}",
msg,
e,
r_err
),
Ok(rt_msg) => {
if rt_msg.is_empty() {
other!("{}: empty msg in log file: {}", msg, r_err)
} else {
other!("{}: {}", msg, rt_msg)
}
}
}
}

async fn get_last_runtime_error(bundle: &str) -> Result<String> {
let log_path = Path::new(bundle).join("log.json");
let mut rt_msg = String::new();
match File::open(Path::new(bundle).join("log.json")).await {
Err(err) => other!("{}: unable to open OCI runtime log file){}", msg, err),
match File::open(log_path).await {
Err(e) => Err(other!("unable to open OCI runtime log file: {}", e)),
Ok(file) => {
let mut lines = BufReader::new(file).lines();
while let Ok(Some(line)) = lines.next_line().await {
// Retrieve the last runtime error
match serde_json::from_str::<Log>(&line) {
Err(err) => return other!("{}: unable to parse log msg: {}", msg, err),
Ok(log) => {
if log.level == "error" {
rt_msg = log.msg.trim().to_string();
let mut reader = BufReader::new(file);
let file_size = reader
.seek(SeekFrom::End(0))
.await
.map_err(other_error!(e, "error seek from start"))?;

let mut pre_buffer: Option<Vec<u8>> = None;
let mut buffer = Vec::new();

for offset in (0..file_size).rev() {
if offset == 0 {
break;
}
reader
.seek(SeekFrom::Start(offset))
.await
.map_err(other_error!(e, "error seek"))?;
let result = reader
.read_until(b'\n', &mut buffer)
.await
.map_err(other_error!(e, "reading from cursor fail"))?;
if result == 1 && pre_buffer.is_some() {
let line = String::from_utf8_lossy(&pre_buffer.unwrap()).into_owned();
match serde_json::from_str::<Log>(&line) {
Err(e) => return Err(other!("unable to parse log msg({}): {}", line, e)),
Ok(log) => {
if log.level == "error" {
rt_msg = log.msg.trim().to_string();
break;
}
}
}
}
pre_buffer = Some(buffer.clone());
buffer.clear();
}
if !rt_msg.is_empty() {
other!("{}: {}", msg, rt_msg)
} else {
other!("{}: (no OCI runtime error in logfile) {}", msg, e)
}
Ok(rt_msg)
}
}
}
Expand Down Expand Up @@ -298,17 +338,17 @@ impl ProcessFactory<ExecProcess> for KuasarExecFactory {
spec: p,
exit_signal: Default::default(),
}),
stdin: Arc::new(Mutex::new(None)),
})
}
}

#[async_trait]
impl ProcessLifecycle<InitProcess> for KuasarInitLifecycle {
async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
self.runtime
.start(p.id.as_str())
.await
.map_err(other_error!(e, "failed start"))?;
if let Err(e) = self.runtime.start(p.id.as_str()).await {
return Err(runtime_error(&p.lifecycle.bundle, e, "OCI runtime start failed").await);
}
p.state = Status::RUNNING;
Ok(())
}
Expand All @@ -319,31 +359,37 @@ impl ProcessLifecycle<InitProcess> for KuasarInitLifecycle {
signal: u32,
all: bool,
) -> containerd_shim::Result<()> {
self.runtime
if let Err(r_err) = self
.runtime
.kill(
p.id.as_str(),
signal,
Some(&runc::options::KillOpts { all }),
)
.await
.map_err(|e| check_kill_error(e.to_string()))
{
let e = runtime_error(&p.lifecycle.bundle, r_err, "OCI runtime kill failed").await;

return Err(check_kill_error(e.to_string()));
}
Ok(())
}

async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
self.runtime
if let Err(e) = self
.runtime
.delete(
p.id.as_str(),
Some(&runc::options::DeleteOpts { force: true }),
)
.await
.or_else(|e| {
if !e.to_string().to_lowercase().contains("does not exist") {
Err(e)
} else {
Ok(())
}
})
.map_err(other_error!(e, "failed delete"))?;
{
if !e.to_string().to_lowercase().contains("does not exist") {
return Err(
runtime_error(&p.lifecycle.bundle, e, "OCI runtime delete failed").await,
);
}
}
self.exit_signal.signal();
Ok(())
}
Expand Down Expand Up @@ -416,7 +462,8 @@ impl KuasarInitLifecycle {
impl ProcessLifecycle<ExecProcess> for KuasarExecLifecycle {
async fn start(&self, p: &mut ExecProcess) -> containerd_shim::Result<()> {
rescan_pci_bus().await?;
let pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", &p.id));
let bundle = self.bundle.to_string();
let pid_path = Path::new(&bundle).join(format!("{}.pid", &p.id));
let mut exec_opts = runc::options::ExecOpts {
io: None,
pid_file: Some(pid_path.to_owned()),
Expand All @@ -441,7 +488,7 @@ impl ProcessLifecycle<ExecProcess> for KuasarExecLifecycle {
if let Some(s) = socket {
s.clean().await;
}
return Err(other!("failed to start runc exec: {}", e));
return Err(runtime_error(&bundle, e, "OCI runtime exec failed").await);
}
copy_io_or_console(p, socket, pio, p.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
Expand Down Expand Up @@ -472,8 +519,10 @@ impl ProcessLifecycle<ExecProcess> for KuasarExecLifecycle {
}
}

async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> {
async fn delete(&self, p: &mut ExecProcess) -> Result<()> {
self.exit_signal.signal();
let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id));
remove_file(exec_pid_path).await.unwrap_or_default();
Ok(())
}

Expand Down Expand Up @@ -613,3 +662,71 @@ pub fn check_kill_error(emsg: String) -> Error {
other!("unknown error after kill {}", emsg)
}
}

#[cfg(test)]
mod tests {
use std::path::Path;

use containerd_shim::util::{mkdir, write_str_to_file};
use tokio::fs::remove_dir_all;

use crate::container::runtime_error;

#[tokio::test]
async fn test_runtime_error_with_logfile() {
let empty_err = runc::error::Error::NotFound;
let log_json = "\
{\"level\":\"info\",\"msg\":\"hello word\",\"time\":\"2022-11-25\"}\n\
{\"level\":\"error\",\"msg\":\"failed error\",\"time\":\"2022-11-26\"}\n\
{\"level\":\"error\",\"msg\":\"panic\",\"time\":\"2022-11-27\"}\n\
{\"level\":\"info\",\"msg\":\"program exit\",\"time\":\"2024-1-24\"}\n\
";
let test_dir = "/tmp/kuasar-test_runtime_error_with_logfile";
let _ = mkdir(test_dir, 0o711).await;
let test_log_file = Path::new(test_dir).join("log.json");
write_str_to_file(test_log_file.as_path(), log_json)
.await
.expect("write log json should not be error");

let expected_msg = "panic";
let actual_err = runtime_error(
test_dir,
empty_err,
"test_runtime_error_with_logfile failed",
)
.await;
remove_dir_all(test_dir).await.expect("remove test dir");
assert!(
actual_err.to_string().contains(expected_msg),
"actual error \"{}\" should contains \"{}\"",
actual_err.to_string(),
expected_msg
);
}

#[tokio::test]
async fn test_runtime_error_without_logfile() {
let empty_err = runc::error::Error::NotFound;
let test_dir = "/tmp/kuasar-test_runtime_error_without_logfile";
let _ = remove_dir_all(test_dir).await;
assert!(
!Path::new(test_dir).exists(),
"{} should not exist",
test_dir
);

let expected_msg = "Unable to locate the runc";
let actual_err = runtime_error(
test_dir,
empty_err,
"test_runtime_error_without_logfile failed",
)
.await;
assert!(
actual_err.to_string().contains(expected_msg),
"actual error \"{}\" should contains \"{}\"",
actual_err.to_string(),
expected_msg
);
}
}
17 changes: 15 additions & 2 deletions vmm/task/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub(crate) async fn copy_io_or_console<P>(
) -> Result<()> {
if p.stdio.terminal {
if let Some(console_socket) = socket {
let console_result = copy_console(&console_socket, &p.stdio, exit_signal).await;
let console_result = copy_console(p, &console_socket, &p.stdio, exit_signal).await;
console_socket.clean().await;
match console_result {
Ok(c) => {
Expand All @@ -144,7 +144,8 @@ pub(crate) async fn copy_io_or_console<P>(
Ok(())
}

async fn copy_console(
async fn copy_console<P>(
p: &ProcessTemplate<P>,
console_socket: &ConsoleSocket,
stdio: &Stdio,
exit_signal: Arc<ExitSignal>,
Expand All @@ -155,6 +156,18 @@ async fn copy_console(
let f = unsafe { File::from_raw_fd(fd) };
if !stdio.stdin.is_empty() {
debug!("copy_console: pipe stdin to console");

let stdin_clone = stdio.stdin.clone();
let stdin_w = p.stdin.clone();
// open the write side to make sure read side unblock, as open write side
// will block too, open it in another thread
tokio::spawn(async move {
if let Ok(stdin_file) = OpenOptions::new().write(true).open(stdin_clone).await {
let mut lock_guard = stdin_w.lock().await;
*lock_guard = Some(stdin_file);
}
});

let console_stdin = f
.try_clone()
.await
Expand Down
Loading