Skip to content

Commit

Permalink
vmm: bugfix of blocking task in spawn_blocking
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Tianyang <burning9699@gmail.com>
  • Loading branch information
Burning1020 committed Jan 1, 2024
1 parent 4999c99 commit eda0c7c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 48 deletions.
70 changes: 30 additions & 40 deletions vmm/sandbox/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ limitations under the License.
*/

use std::{
os::unix::io::{IntoRawFd, RawFd},
io::{BufRead, BufReader, Write},
os::unix::{
io::{IntoRawFd, RawFd},
net::UnixStream,
},
time::Duration,
};

Expand All @@ -30,17 +34,12 @@ use nix::{
time::{clock_gettime, ClockId},
unistd::close,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
time::timeout,
};
use tokio::time::timeout;
use ttrpc::{context::with_timeout, r#async::Client};
use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient};

use crate::network::{NetworkInterface, Route};

const HVSOCK_RETRY_TIMEOUT_IN_MS: u64 = 10;
const TIME_SYNC_PERIOD: u64 = 60;
const TIME_DIFF_TOLERANCE_IN_MS: u64 = 10;

Expand Down Expand Up @@ -68,7 +67,11 @@ async fn new_ttrpc_client(address: &str) -> Result<Client> {

let client = timeout(Duration::from_secs(ctx_timeout), fut)
.await
.map_err(|_| anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err))?;
.map_err(|_| {
let e = anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err);
error!("{}", e);
e
})?;
Ok(client)
}

Expand Down Expand Up @@ -162,41 +165,28 @@ async fn connect_to_hvsocket(address: &str) -> Result<RawFd> {
if v.len() < 2 {
return Err(anyhow!("hvsock address {} should not less than 2", address).into());
}
(v[0], v[1])
(v[0].to_string(), v[1].to_string())
};

let fut = async {
let mut stream = UnixStream::connect(addr).await?;

match stream.write(format!("CONNECT {}\n", port).as_bytes()).await {
Ok(_) => {
let mut buf = [0; 4096];
match stream.read(&mut buf).await {
Ok(0) => Err(anyhow!("stream closed")),
Ok(n) => {
if String::from_utf8(buf[..n].to_vec())
.unwrap_or_default()
.contains("OK")
{
return Ok(stream.into_std()?.into_raw_fd());
}
Err(anyhow!("failed to connect"))
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
Err(anyhow!("{}", e))
}
Err(e) => Err(anyhow!("failed to read from hvsock: {}", e)),
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Err(anyhow!("{}", e)),
Err(e) => Err(anyhow!("failed to write CONNECT to hvsock: {}", e)),
tokio::task::spawn_blocking(move || {
let mut stream =
UnixStream::connect(&addr).map_err(|e| anyhow!("failed to connect hvsock: {}", e))?;
stream
.write_all(format!("CONNECT {}\n", port).as_bytes())
.map_err(|e| anyhow!("hvsock connected but failed to write CONNECT: {}", e))?;

let mut response = String::new();
BufReader::new(&stream)
.read_line(&mut response)
.map_err(|e| anyhow!("CONNECT sent but failed to get response: {}", e))?;
if response.starts_with("OK") {
Ok(stream.into_raw_fd())
} else {
Err(anyhow!("CONNECT sent but response is not OK: {}", response).into())
}
.map_err(Error::Other)
};

timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT_IN_MS), fut)
.await
.map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT_IN_MS))?
})
.await
.map_err(|e| anyhow!("failed to spawn blocking task: {}", e))?
}

pub fn unix_sock(r#abstract: bool, socket_path: &str) -> Result<UnixAddr> {
Expand Down
14 changes: 7 additions & 7 deletions vmm/sandbox/src/cloud_hypervisor/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ limitations under the License.

use std::{
os::unix::net::UnixStream,
path::Path,
thread::sleep,
time::{Duration, SystemTime},
};

use anyhow::anyhow;
use api_client::{simple_api_command, simple_api_full_command_with_fds_and_response};
use containerd_sandbox::error::Result;
use log::{error, trace};
use log::error;

use crate::{
cloud_hypervisor::devices::{block::DiskConfig, AddDeviceResponse, RemoveDeviceRequest},
Expand All @@ -38,25 +37,26 @@ pub struct ChClient {
}

impl ChClient {
pub fn new<P: AsRef<Path>>(socket_path: P) -> Result<Self> {
pub async fn new(socket_path: String) -> Result<Self> {
let start_time = SystemTime::now();
loop {
tokio::task::spawn_blocking(move || loop {
match UnixStream::connect(&socket_path) {
Ok(socket) => {
return Ok(Self { socket });
}
Err(e) => {
trace!("failed to create client: {:?}", e);
if start_time.elapsed().unwrap().as_secs()
> CLOUD_HYPERVISOR_START_TIMEOUT_IN_SEC
{
error!("failed to create client: {:?}", e);
error!("failed to connect api server: {:?}", e);
return Err(anyhow!("timeout connect client, {}", e).into());
}
sleep(Duration::from_millis(10));
}
}
}
})
.await
.map_err(|e| anyhow!("failed to spawn a task {}", e))?
}

pub fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<String> {
Expand Down
2 changes: 1 addition & 1 deletion vmm/sandbox/src/cloud_hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl CloudHypervisorVM {
}

async fn create_client(&self) -> Result<ChClient> {
ChClient::new(&self.config.api_socket)
ChClient::new(self.config.api_socket.to_string()).await
}

fn get_client(&mut self) -> Result<&mut ChClient> {
Expand Down

0 comments on commit eda0c7c

Please sign in to comment.