Skip to content

Commit

Permalink
fix timeout issue in the CLI
Browse files Browse the repository at this point in the history
create Channel::set_timeout()
  • Loading branch information
Keksoj committed Jan 16, 2024
1 parent 9d5788c commit c26fc36
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
1 change: 1 addition & 0 deletions bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn ctl(args: cli::Args) -> anyhow::Result<()> {
})?;

let timeout = Duration::from_millis(args.timeout.unwrap_or(config.ctl_command_timeout));
info!("applying timeout {:?}", timeout);

let mut command_manager = CommandManager {
channel,
Expand Down
52 changes: 38 additions & 14 deletions command/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ pub enum ChannelError {
InvalidCharSet(String),
#[error("Error deserializing message")]
Serde(serde_json::error::Error),
#[error("Could not change the blocking status ef the unix stream with file descriptor {fd}: {error}")]
#[error("could not set the timeout of the unix stream with file descriptor {fd}: {error}")]
SetTimeout { fd: i32, error: String },
#[error("Could not change the blocking status of the unix stream with file descriptor {fd}: {error}")]
BlockingStatus { fd: i32, error: String },
#[error("Connection error: {0:?}")]
Connection(Option<std::io::Error>),
Expand Down Expand Up @@ -120,6 +122,22 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
Ok(())
}

/// set the read_timeout of the unix stream. This works only temporary, be sure to set the timeout to None afterwards.
fn set_timeout(&mut self, timeout: Option<Duration>) -> Result<(), ChannelError> {
unsafe {
let fd = self.sock.as_raw_fd();
let stream = StdUnixStream::from_raw_fd(fd);
stream
.set_read_timeout(timeout)
.map_err(|error| ChannelError::SetTimeout {
fd,
error: error.to_string(),
})?;
let _fd = stream.into_raw_fd();
}
Ok(())
}

/// set the channel to be blocking
pub fn blocking(&mut self) -> Result<(), ChannelError> {
self.set_nonblocking(false)
Expand Down Expand Up @@ -288,35 +306,41 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
) -> Result<Rx, ChannelError> {
let now = std::time::Instant::now();

loop {
// set a very small timeout, to repeat the loop often
self.set_timeout(Some(Duration::from_millis(10)))?;

let status = loop {
if let Some(timeout) = timeout {
if now.elapsed() >= timeout {
return Err(ChannelError::TimeoutReached(timeout));
break Err(ChannelError::TimeoutReached(timeout));
}
}

match self.front_buf.data().iter().position(|&x| x == 0) {
Some(position) => return self.read_and_parse_from_front_buffer(position),
Some(position) => break self.read_and_parse_from_front_buffer(position),
None => {
if self.front_buf.available_space() == 0 {
if self.front_buf.capacity() == self.max_buffer_size {
return Err(ChannelError::BufferFull);
break Err(ChannelError::BufferFull);
}
let new_size = min(self.front_buf.capacity() + 5000, self.max_buffer_size);
self.front_buf.grow(new_size);
}

match self
.sock
.read(self.front_buf.space())
.map_err(ChannelError::Read)?
{
0 => return Err(ChannelError::NoByteToRead),
bytes_read => self.front_buf.fill(bytes_read),
match self.sock.read(self.front_buf.space()) {
Ok(0) => break Err(ChannelError::NoByteToRead),
Ok(bytes_read) => self.front_buf.fill(bytes_read),
Err(io_error) => match io_error.kind() {
ErrorKind::WouldBlock => continue, // ignore 10 millisecond timeouts
_ => break Err(ChannelError::Read(io_error)),
},
};
}
}
}
};

self.set_timeout(None)?;

status
}

fn read_and_parse_from_front_buffer(&mut self, position: usize) -> Result<Rx, ChannelError> {
Expand Down

0 comments on commit c26fc36

Please sign in to comment.