Skip to content

Commit

Permalink
Merge pull request #123 from Burning1020/fix-residual
Browse files Browse the repository at this point in the history
bugfix: residual sync clock thread
  • Loading branch information
abel-von authored Apr 8, 2024
2 parents c300c19 + caeb01c commit ee046ce
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 128 deletions.
138 changes: 93 additions & 45 deletions vmm/sandbox/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ limitations under the License.
*/

use std::{
os::unix::io::{IntoRawFd, RawFd},
os::fd::{IntoRawFd, RawFd},
sync::Arc,
time::Duration,
};

use anyhow::anyhow;
use containerd_sandbox::error::{Error, Result};
use log::{debug, error, warn};
use containerd_sandbox::{
error::{Error, Result},
signal::ExitSignal,
};
use log::{debug, error};
use nix::{
sys::{
socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr},
Expand Down Expand Up @@ -258,59 +262,91 @@ pub(crate) async fn client_update_routes(
Ok(())
}

pub(crate) async fn client_sync_clock(client: &SandboxServiceClient, id: &str) {
pub(crate) fn client_sync_clock(
client: &SandboxServiceClient,
id: &str,
exit_signal: Arc<ExitSignal>,
) {
let id = id.to_string();
let client = client.clone();
let tolerance_nanos = Duration::from_millis(TIME_DIFF_TOLERANCE_IN_MS).as_nanos() as i64;
let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME);
let tolerance_nanos = Duration::from_millis(TIME_DIFF_TOLERANCE_IN_MS);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(TIME_SYNC_PERIOD)).await;
debug!("sync_clock {}: start sync clock from host to guest", id);

let mut req = SyncClockPacket::new();
match clock_gettime(clock_id) {
Ok(ts) => req.ClientSendTime = ts.num_nanoseconds(),
Err(e) => {
warn!("sync_clock {}: failed to get current clock: {}", id, e);
continue;
}
}
match client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &req)
.await
{
Ok(mut p) => {
match clock_gettime(clock_id) {
Ok(ts) => p.ServerArriveTime = ts.num_nanoseconds(),
Err(e) => {
warn!("sync_clock {}: failed to get current clock: {}", id, e);
continue;
}
}
p.Delta = ((p.ClientSendTime - p.ClientArriveTime)
+ (p.ServerArriveTime - p.ServerSendTime))
/ 2;
if p.Delta.abs() > tolerance_nanos {
if let Err(e) = client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &p)
.await
{
error!("sync_clock {}: sync clock set delta failed: {:?}", id, e);
}
}
}
Err(e) => {
error!("sync_clock {}: get error: {:?}", id, e);
let fut = async {
loop {
tokio::time::sleep(Duration::from_secs(TIME_SYNC_PERIOD)).await;
if let Err(e) = do_once_sync_clock(&client, tolerance_nanos).await {
debug!("sync_clock {}: {:?}", id, e);
}
}
};

tokio::select! {
_ = fut => (),
_ = exit_signal.wait() => {},
}
});
}

// Introduce a set of mechanism based on Precision Time Protocol to keep guest clock synchronized
// with host clock periodically.
async fn do_once_sync_clock(
client: &SandboxServiceClient,
tolerance_nanos: Duration,
) -> Result<()> {
let mut req = SyncClockPacket::new();
let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME);
req.ClientSendTime = clock_gettime(clock_id)
.map_err(|e| anyhow!("get current clock: {}", e))?
.num_nanoseconds();

let mut p = client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &req)
.await
.map_err(|e| anyhow!("get guest clock packet: {:?}", e))?;

p.ServerArriveTime = clock_gettime(clock_id)
.map_err(|e| anyhow!("get current clock: {}", e))?
.num_nanoseconds();

p.Delta = checked_compute_delta(
p.ClientSendTime,
p.ClientArriveTime,
p.ServerSendTime,
p.ServerArriveTime,
)?;
if p.Delta.abs() > tolerance_nanos.as_nanos() as i64 {
client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &p)
.await
.map_err(|e| anyhow!("set delta: {:?}", e))?;
}
Ok(())
}

// delta = ((c_send - c_arrive) + (s_arrive - s_send)) / 2
fn checked_compute_delta(c_send: i64, c_arrive: i64, s_send: i64, s_arrive: i64) -> Result<i64> {
let delta_client = c_send
.checked_sub(c_arrive)
.ok_or_else(|| anyhow!("integer overflow {} - {}", c_send, c_arrive))?;

let delta_server = s_arrive
.checked_sub(s_send)
.ok_or_else(|| anyhow!("integer overflow {} - {}", s_arrive, s_send))?;

let delta_sum = delta_client
.checked_add(delta_server)
.ok_or_else(|| anyhow!("integer overflow {} + {}", delta_client, delta_server))?;

let delta = delta_sum
.checked_div(2)
.ok_or_else(|| anyhow!("integer overflow {} / 2", delta_sum))?;

Ok(delta)
}

#[cfg(test)]
mod tests {
use crate::client::new_ttrpc_client_with_timeout;
use crate::client::{checked_compute_delta, new_ttrpc_client_with_timeout};

#[tokio::test]
async fn test_new_ttrpc_client_timeout() {
Expand All @@ -319,4 +355,16 @@ mod tests {
.await
.is_err());
}

#[test]
fn test_checked_compute_delta() {
let c_send = 231;
let c_arrive = 135;
let s_send = 137;
let s_arrive = 298;

let expect_delta = 128;
let actual_delta = checked_compute_delta(c_send, c_arrive, s_send, s_arrive).unwrap();
assert_eq!(expect_delta, actual_delta);
}
}
31 changes: 17 additions & 14 deletions vmm/sandbox/src/cloud_hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::{os::unix::io::RawFd, process::Stdio, time::Duration};
use std::{os::fd::OwnedFd, process::Stdio, time::Duration};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -72,7 +72,8 @@ pub struct CloudHypervisorVM {
wait_chan: Option<Receiver<(u32, i128)>>,
#[serde(skip)]
client: Option<ChClient>,
fds: Vec<RawFd>,
#[serde(skip)]
fds: Vec<OwnedFd>,
pids: Pids,
}

Expand Down Expand Up @@ -142,7 +143,7 @@ impl CloudHypervisorVM {
Ok(pid)
}

fn append_fd(&mut self, fd: RawFd) -> usize {
fn append_fd(&mut self, fd: OwnedFd) -> usize {
self.fds.push(fd);
self.fds.len() - 1 + 3
}
Expand Down Expand Up @@ -175,17 +176,19 @@ impl VM for CloudHypervisorVM {
params.push("-vv".to_string());
}

let mut cmd = tokio::process::Command::new(&self.config.path);
cmd.args(params.as_slice());

set_cmd_fd(&mut cmd, self.fds.to_vec())?;
set_cmd_netns(&mut cmd, self.netns.to_string())?;
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
info!("start cloud hypervisor with cmdline: {:?}", cmd);
let child = cmd
.spawn()
.map_err(|e| anyhow!("failed to spawn cloud hypervisor command: {}", e))?;
// Drop cmd immediately to let the fds in pre_exec be closed.
let child = {
let mut cmd = tokio::process::Command::new(&self.config.path);
cmd.args(params.as_slice());

set_cmd_fd(&mut cmd, self.fds.drain(..).collect())?;
set_cmd_netns(&mut cmd, self.netns.to_string())?;
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
info!("start cloud hypervisor with cmdline: {:?}", cmd);
cmd.spawn()
.map_err(|e| anyhow!("failed to spawn cloud hypervisor command: {}", e))?
};
let pid = child.id();
self.pids.vmm_pid = pid;
let pid_file = format!("{}/pid", self.base_dir);
Expand Down
20 changes: 8 additions & 12 deletions vmm/sandbox/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::os::unix::io::RawFd;
use std::os::fd::OwnedFd;

use containerd_sandbox::error::{Error, Result};

Expand Down Expand Up @@ -182,49 +182,46 @@ impl Transport {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum DeviceInfo {
Block(BlockDeviceInfo),
#[allow(dead_code)]
Tap(TapDeviceInfo),
#[allow(dead_code)]
Physical(PhysicalDeviceInfo),
#[allow(dead_code)]
VhostUser(VhostUserDeviceInfo),
Char(CharDeviceInfo),
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct BlockDeviceInfo {
pub id: String,
pub path: String,
pub read_only: bool,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct TapDeviceInfo {
pub id: String,
pub index: u32,
pub name: String,
pub mac_address: String,
pub fds: Vec<RawFd>,
pub fds: Vec<OwnedFd>,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PhysicalDeviceInfo {
pub id: String,
pub bdf: String,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct VhostUserDeviceInfo {
pub id: String,
pub socket_path: String,
pub mac_address: String,
pub r#type: String,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct CharDeviceInfo {
pub id: String,
pub chardev_id: String,
Expand All @@ -235,6 +232,5 @@ pub struct CharDeviceInfo {
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CharBackendType {
Pipe(String),
#[allow(dead_code)]
Socket(String),
}
7 changes: 4 additions & 3 deletions vmm/sandbox/src/network/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,19 @@ impl NetworkInterface {
Ok(())
}

pub async fn attach_to<V: VM>(&self, sandbox: &mut KuasarSandbox<V>) -> Result<()> {
pub async fn attach_to<V: VM>(&mut self, sandbox: &mut KuasarSandbox<V>) -> Result<()> {
let id = format!("intf-{}", self.index);
match &self.r#type {
LinkType::Veth => {
if let Some(intf) = &self.twin {
if let Some(intf) = self.twin.as_mut() {
sandbox
.vm
.attach(DeviceInfo::Tap(TapDeviceInfo {
id,
index: self.index,
name: intf.name.to_string(),
mac_address: self.mac_address.to_string(),
fds: intf.fds.iter().map(|fd| fd.as_raw_fd()).collect(),
fds: intf.fds.drain(..).collect(),
}))
.await?;
} else {
Expand Down Expand Up @@ -470,6 +470,7 @@ fn get_bdf_for_eth(if_name: &str) -> Result<String> {
e
)
})?;
close(sock).unwrap_or_default();
Ok(bdf.to_string())
}

Expand Down
8 changes: 4 additions & 4 deletions vmm/sandbox/src/qemu/devices/vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::os::unix::io::{IntoRawFd, RawFd};
use std::os::fd::{AsRawFd, OwnedFd};

use anyhow::anyhow;
use containerd_sandbox::error::Result;
Expand Down Expand Up @@ -60,18 +60,18 @@ impl VSockDevice {
}
}

pub async fn find_context_id() -> Result<(RawFd, u64)> {
pub async fn find_context_id() -> Result<(OwnedFd, u64)> {
// TODO make sure if this thread_rng is enough, if we should new a seedable rng everytime.
let vsock_file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.mode(0o666)
.open(VHOST_VSOCK_DEV_PATH)
.await?;
let vsockfd = vsock_file.into_std().await.into_raw_fd();
let vsockfd = OwnedFd::from(vsock_file.into_std().await);
for _i in 0..IOCTL_TRY_TIMES {
let cid = thread_rng().gen_range(3..i32::MAX as u64);
let res = unsafe { set_vhost_guest_cid(vsockfd, &cid) };
let res = unsafe { set_vhost_guest_cid(vsockfd.as_raw_fd(), &cid) };
match res {
Ok(_) => return Ok((vsockfd, cid)),
Err(_) => continue,
Expand Down
Loading

0 comments on commit ee046ce

Please sign in to comment.