diff --git a/vmm/sandbox/src/client.rs b/vmm/sandbox/src/client.rs index de1a5411..fb76c67f 100644 --- a/vmm/sandbox/src/client.rs +++ b/vmm/sandbox/src/client.rs @@ -15,7 +15,11 @@ limitations under the License. */ use std::{ - os::unix::io::{IntoRawFd, RawFd}, + io::{BufRead, BufReader, Write}, + os::unix::{ + io::{IntoRawFd, RawFd}, + net::UnixStream, + }, time::Duration, }; @@ -30,17 +34,12 @@ use nix::{ time::{clock_gettime, ClockId}, unistd::close, }; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::UnixStream, - time::timeout, -}; +use tokio::time::timeout; use ttrpc::{context::with_timeout, r#async::Client}; use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient}; use crate::network::{NetworkInterface, Route}; -const HVSOCK_RETRY_TIMEOUT_IN_MS: u64 = 10; const TIME_SYNC_PERIOD: u64 = 60; const TIME_DIFF_TOLERANCE_IN_MS: u64 = 10; @@ -68,7 +67,11 @@ async fn new_ttrpc_client(address: &str) -> Result { let client = timeout(Duration::from_secs(ctx_timeout), fut) .await - .map_err(|_| anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err))?; + .map_err(|_| { + let e = anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err); + error!("{}", e); + e + })?; Ok(client) } @@ -162,41 +165,28 @@ async fn connect_to_hvsocket(address: &str) -> Result { if v.len() < 2 { return Err(anyhow!("hvsock address {} should not less than 2", address).into()); } - (v[0], v[1]) + (v[0].to_string(), v[1].to_string()) }; - let fut = async { - let mut stream = UnixStream::connect(addr).await?; - - match stream.write(format!("CONNECT {}\n", port).as_bytes()).await { - Ok(_) => { - let mut buf = [0; 4096]; - match stream.read(&mut buf).await { - Ok(0) => Err(anyhow!("stream closed")), - Ok(n) => { - if String::from_utf8(buf[..n].to_vec()) - .unwrap_or_default() - .contains("OK") - { - return Ok(stream.into_std()?.into_raw_fd()); - } - Err(anyhow!("failed to connect")) - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - Err(anyhow!("{}", e)) - } - Err(e) => Err(anyhow!("failed to read from hvsock: {}", e)), - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Err(anyhow!("{}", e)), - Err(e) => Err(anyhow!("failed to write CONNECT to hvsock: {}", e)), + tokio::task::spawn_blocking(move || { + let mut stream = + UnixStream::connect(&addr).map_err(|e| anyhow!("failed to connect hvsock: {}", e))?; + stream + .write_all(format!("CONNECT {}\n", port).as_bytes()) + .map_err(|e| anyhow!("hvsock connected but failed to write CONNECT: {}", e))?; + + let mut response = String::new(); + BufReader::new(&stream) + .read_line(&mut response) + .map_err(|e| anyhow!("CONNECT sent but failed to get response: {}", e))?; + if response.starts_with("OK") { + Ok(stream.into_raw_fd()) + } else { + Err(anyhow!("CONNECT sent but response is not OK: {}", response).into()) } - .map_err(Error::Other) - }; - - timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT_IN_MS), fut) - .await - .map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT_IN_MS))? + }) + .await + .map_err(|e| anyhow!("failed to spawn blocking task: {}", e))? } pub fn unix_sock(r#abstract: bool, socket_path: &str) -> Result { diff --git a/vmm/sandbox/src/cloud_hypervisor/client.rs b/vmm/sandbox/src/cloud_hypervisor/client.rs index b00ab91a..4771ca94 100644 --- a/vmm/sandbox/src/cloud_hypervisor/client.rs +++ b/vmm/sandbox/src/cloud_hypervisor/client.rs @@ -16,7 +16,6 @@ limitations under the License. use std::{ os::unix::net::UnixStream, - path::Path, thread::sleep, time::{Duration, SystemTime}, }; @@ -24,7 +23,7 @@ use std::{ use anyhow::anyhow; use api_client::{simple_api_command, simple_api_full_command_with_fds_and_response}; use containerd_sandbox::error::Result; -use log::{error, trace}; +use log::error; use crate::{ cloud_hypervisor::devices::{block::DiskConfig, AddDeviceResponse, RemoveDeviceRequest}, @@ -38,25 +37,26 @@ pub struct ChClient { } impl ChClient { - pub fn new>(socket_path: P) -> Result { + pub async fn new(socket_path: String) -> Result { let start_time = SystemTime::now(); - loop { + tokio::task::spawn_blocking(move || loop { match UnixStream::connect(&socket_path) { Ok(socket) => { return Ok(Self { socket }); } Err(e) => { - trace!("failed to create client: {:?}", e); if start_time.elapsed().unwrap().as_secs() > CLOUD_HYPERVISOR_START_TIMEOUT_IN_SEC { - error!("failed to create client: {:?}", e); + error!("failed to connect api server: {:?}", e); return Err(anyhow!("timeout connect client, {}", e).into()); } sleep(Duration::from_millis(10)); } } - } + }) + .await + .map_err(|e| anyhow!("failed to spawn a task {}", e))? } pub fn hot_attach(&mut self, device_info: DeviceInfo) -> Result { diff --git a/vmm/sandbox/src/cloud_hypervisor/mod.rs b/vmm/sandbox/src/cloud_hypervisor/mod.rs index 0478b2f5..41dea8b2 100644 --- a/vmm/sandbox/src/cloud_hypervisor/mod.rs +++ b/vmm/sandbox/src/cloud_hypervisor/mod.rs @@ -110,7 +110,7 @@ impl CloudHypervisorVM { } async fn create_client(&self) -> Result { - ChClient::new(&self.config.api_socket) + ChClient::new(self.config.api_socket.to_string()).await } fn get_client(&mut self) -> Result<&mut ChClient> {