Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Apr 17, 2024
1 parent c3a67f1 commit 918d82f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 59 deletions.
144 changes: 86 additions & 58 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,15 @@ pub struct Client {
}

async fn run_with_timeout<T>(
timeout: Duration,
timeout: Option<Duration>,
future: impl futures::Future<Output = RedisResult<T>> + Send,
) -> redis::RedisResult<T> {
if timeout == Duration::from_secs(0) {
// run without timeout
future.await
} else {
// run with timeout
tokio::time::timeout(timeout, future)
match timeout {
Some(duration) => tokio::time::timeout(duration, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res)
.and_then(|res| res),
None => future.await,
}
}

Expand All @@ -118,47 +115,70 @@ enum TimeUnit {
Seconds = 1,
}

// Enumeration representing different request timeout options.
#[derive(Default, PartialEq, Debug)]
enum RequestTimeoutOption {
// Indicates no timeout should be set for the request.
NoTimeout,
// Indicates the request timeout should be based on the client's configured timeout.
#[default]
ClientConfig,
// Indicates the request timeout should be based on the timeout specified in the blocking command.
BlockingCommand(Duration),
}

// Attempts to get the timeout duration from the command argument at `timeout_idx`.
// If the argument can be parsed into a duration, it returns the duration in seconds. Otherwise, it returns None.
fn try_get_timeout_from_cmd_arg(
// If the argument can be parsed into a duration, it returns the duration in seconds with BlockingCmdTimeout.
// If the timeout argument value is zero, NoTimeout will be returned. Otherwise, ClientConfigTimeout is returned.
fn get_timeout_from_cmd_arg(
cmd: &Cmd,
timeout_idx: usize,
time_unit: TimeUnit,
) -> Option<Duration> {
cmd.arg_idx(timeout_idx).and_then(|timeout_bytes| {
std::str::from_utf8(timeout_bytes)
.ok()
.and_then(|timeout_str| {
timeout_str.parse::<f64>().ok().map(|timeout| {
let mut timeout_secs = timeout / ((time_unit as i32) as f64);
if timeout_secs < 0.0 {
// Timeout cannot be negative, return None so the default request timeout will be used and the server will response with error
return None;
} else if timeout_secs > 0.0 {
// Extend the request timeout to ensure we don't timeout before receiving a response from the server.
timeout_secs += BLOCKING_CMD_TIMEOUT_EXTENSION;
};
Some(Duration::from_secs_f64(timeout_secs))
) -> RequestTimeoutOption {
cmd.arg_idx(timeout_idx)
.and_then(|timeout_bytes| {
std::str::from_utf8(timeout_bytes)
.ok()
.and_then(|timeout_str| {
timeout_str.parse::<f64>().ok().map(|timeout| {
let timeout_secs = timeout / ((time_unit as i32) as f64);
if timeout_secs < 0.0 {
// Timeout cannot be negative, return the client's configured request timeout
RequestTimeoutOption::ClientConfig
} else if timeout_secs == 0.0 {
// `0` means we should set no timeout
RequestTimeoutOption::NoTimeout
} else {
// Extend the request timeout to ensure we don't timeout before receiving a response from the server.
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
timeout_secs + BLOCKING_CMD_TIMEOUT_EXTENSION,
))
}
})
})
})
.unwrap_or(None)
})
})
.unwrap_or_default()
}

fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> Duration {
fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> Option<Duration> {
let command = cmd.command().unwrap_or_default();
let blocking_timeout = match command.as_slice() {
let timeout = match command.as_slice() {
b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => {
try_get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds)
get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds)
}
b"BLMPOP" | b"BZMPOP" => try_get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds),
b"BLMPOP" | b"BZMPOP" => get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds),
b"XREAD" | b"XREADGROUP" => cmd
.position(b"BLOCK")
.and_then(|idx| try_get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds)),
_ => None,
.map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds))
.unwrap_or_default(),
_ => RequestTimeoutOption::ClientConfig,
};

blocking_timeout.unwrap_or(default_timeout)
match timeout {
RequestTimeoutOption::NoTimeout => None,
RequestTimeoutOption::ClientConfig => Some(default_timeout),
RequestTimeoutOption::BlockingCommand(blocking_cmd_duration) => Some(blocking_cmd_duration),
}
}

impl Client {
Expand Down Expand Up @@ -246,7 +266,7 @@ impl Client {
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let offset = command_count + 1;
run_with_timeout(self.request_timeout, async move {
run_with_timeout(Some(self.request_timeout), async move {
let values = match self.internal_client {
ClientWrapper::Standalone(ref mut client) => {
client.send_pipeline(pipeline, offset, 1).await
Expand Down Expand Up @@ -536,17 +556,19 @@ mod tests {

use redis::Cmd;

use crate::client::{get_request_timeout, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION};
use crate::client::{
get_request_timeout, RequestTimeoutOption, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION,
};

use super::try_get_timeout_from_cmd_arg;
use super::get_timeout_from_cmd_arg;

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_int() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("5");
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
Some(Duration::from_secs_f64(
get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
Expand All @@ -557,8 +579,8 @@ mod tests {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5);
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
Some(Duration::from_secs_f64(
get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
Expand All @@ -569,40 +591,40 @@ mod tests {
let mut cmd = Cmd::new();
cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key");
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds),
Some(Duration::from_secs_f64(
get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_none_when_timeout_isnt_passed() {
fn test_get_timeout_from_cmd_returns_default_timeout_when_timeout_isnt_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3");
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
None,
get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
RequestTimeoutOption::ClientConfig,
);
}

#[test]
fn test_get_timeout_from_cmd_returns_none_when_timeout_is_negative() {
fn test_get_timeout_from_cmd_returns_default_timeout_when_timeout_is_negative() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(-1);
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
None,
get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
RequestTimeoutOption::ClientConfig,
);
}

#[test]
fn test_get_timeout_from_cmd_returns_duration_without_extension_when_zero_is_passed() {
fn test_get_timeout_from_cmd_returns_no_timeout_when_zero_is_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0);
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
Some(Duration::from_secs(0)),
get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
RequestTimeoutOption::NoTimeout,
);
}

Expand All @@ -612,21 +634,27 @@ mod tests {
cmd.arg("BLPOP").arg("key1").arg("key2").arg("500");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_secs_f64(500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION)
Some(Duration::from_secs_f64(
500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_secs_f64(0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION)
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("BLMPOP").arg("0.857").arg("key");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_secs_f64(0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION)
Some(Duration::from_secs_f64(
0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

Expand All @@ -636,14 +664,14 @@ mod tests {
cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_millis(100)
Some(Duration::from_millis(100))
);

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("key");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_millis(100)
Some(Duration::from_millis(100))
);
}
}
2 changes: 1 addition & 1 deletion glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) struct ReconnectingConnection {

async fn get_multiplexed_connection(client: &redis::Client) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
DEFAULT_CONNECTION_ATTEMPT_TIMEOUT,
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
client.get_multiplexed_async_connection(),
)
.await
Expand Down

0 comments on commit 918d82f

Please sign in to comment.