diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 98f9e40f2d..8122f0a4be 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -95,18 +95,15 @@ pub struct Client { } async fn run_with_timeout( - timeout: Duration, + timeout: Option, future: impl futures::Future> + Send, ) -> redis::RedisResult { - if timeout == Duration::from_secs(0) { - // run without timeout - future.await - } else { - // run with timeout - tokio::time::timeout(timeout, future) + match timeout { + Some(duration) => tokio::time::timeout(duration, future) .await .map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into()) - .and_then(|res| res) + .and_then(|res| res), + None => future.await, } } @@ -118,47 +115,70 @@ enum TimeUnit { Seconds = 1, } +// Enumeration representing different request timeout options. +#[derive(Default, PartialEq, Debug)] +enum RequestTimeoutOption { + // Indicates no timeout should be set for the request. + NoTimeout, + // Indicates the request timeout should be based on the client's configured timeout. + #[default] + ClientConfig, + // Indicates the request timeout should be based on the timeout specified in the blocking command. + BlockingCommand(Duration), +} + // Attempts to get the timeout duration from the command argument at `timeout_idx`. -// If the argument can be parsed into a duration, it returns the duration in seconds. Otherwise, it returns None. -fn try_get_timeout_from_cmd_arg( +// If the argument can be parsed into a duration, it returns the duration in seconds with BlockingCmdTimeout. +// If the timeout argument value is zero, NoTimeout will be returned. Otherwise, ClientConfigTimeout is returned. +fn get_timeout_from_cmd_arg( cmd: &Cmd, timeout_idx: usize, time_unit: TimeUnit, -) -> Option { - cmd.arg_idx(timeout_idx).and_then(|timeout_bytes| { - std::str::from_utf8(timeout_bytes) - .ok() - .and_then(|timeout_str| { - timeout_str.parse::().ok().map(|timeout| { - let mut timeout_secs = timeout / ((time_unit as i32) as f64); - if timeout_secs < 0.0 { - // Timeout cannot be negative, return None so the default request timeout will be used and the server will response with error - return None; - } else if timeout_secs > 0.0 { - // Extend the request timeout to ensure we don't timeout before receiving a response from the server. - timeout_secs += BLOCKING_CMD_TIMEOUT_EXTENSION; - }; - Some(Duration::from_secs_f64(timeout_secs)) +) -> RequestTimeoutOption { + cmd.arg_idx(timeout_idx) + .and_then(|timeout_bytes| { + std::str::from_utf8(timeout_bytes) + .ok() + .and_then(|timeout_str| { + timeout_str.parse::().ok().map(|timeout| { + let timeout_secs = timeout / ((time_unit as i32) as f64); + if timeout_secs < 0.0 { + // Timeout cannot be negative, return the client's configured request timeout + RequestTimeoutOption::ClientConfig + } else if timeout_secs == 0.0 { + // `0` means we should set no timeout + RequestTimeoutOption::NoTimeout + } else { + // Extend the request timeout to ensure we don't timeout before receiving a response from the server. + RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( + timeout_secs + BLOCKING_CMD_TIMEOUT_EXTENSION, + )) + } + }) }) - }) - .unwrap_or(None) - }) + }) + .unwrap_or_default() } -fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> Duration { +fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> Option { let command = cmd.command().unwrap_or_default(); - let blocking_timeout = match command.as_slice() { + let timeout = match command.as_slice() { b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => { - try_get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds) + get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds) } - b"BLMPOP" | b"BZMPOP" => try_get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds), + b"BLMPOP" | b"BZMPOP" => get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds), b"XREAD" | b"XREADGROUP" => cmd .position(b"BLOCK") - .and_then(|idx| try_get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds)), - _ => None, + .map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds)) + .unwrap_or_default(), + _ => RequestTimeoutOption::ClientConfig, }; - blocking_timeout.unwrap_or(default_timeout) + match timeout { + RequestTimeoutOption::NoTimeout => None, + RequestTimeoutOption::ClientConfig => Some(default_timeout), + RequestTimeoutOption::BlockingCommand(blocking_cmd_duration) => Some(blocking_cmd_duration), + } } impl Client { @@ -246,7 +266,7 @@ impl Client { ) -> redis::RedisFuture<'a, Value> { let command_count = pipeline.cmd_iter().count(); let offset = command_count + 1; - run_with_timeout(self.request_timeout, async move { + run_with_timeout(Some(self.request_timeout), async move { let values = match self.internal_client { ClientWrapper::Standalone(ref mut client) => { client.send_pipeline(pipeline, offset, 1).await @@ -536,17 +556,19 @@ mod tests { use redis::Cmd; - use crate::client::{get_request_timeout, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION}; + use crate::client::{ + get_request_timeout, RequestTimeoutOption, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION, + }; - use super::try_get_timeout_from_cmd_arg; + use super::get_timeout_from_cmd_arg; #[test] fn test_get_timeout_from_cmd_returns_correct_duration_int() { let mut cmd = Cmd::new(); cmd.arg("BLPOP").arg("key1").arg("key2").arg("5"); assert_eq!( - try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), - Some(Duration::from_secs_f64( + get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), + RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( 5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION )) ); @@ -557,8 +579,8 @@ mod tests { let mut cmd = Cmd::new(); cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5); assert_eq!( - try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), - Some(Duration::from_secs_f64( + get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), + RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION )) ); @@ -569,40 +591,40 @@ mod tests { let mut cmd = Cmd::new(); cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key"); assert_eq!( - try_get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds), - Some(Duration::from_secs_f64( + get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds), + RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION )) ); } #[test] - fn test_get_timeout_from_cmd_returns_none_when_timeout_isnt_passed() { + fn test_get_timeout_from_cmd_returns_default_timeout_when_timeout_isnt_passed() { let mut cmd = Cmd::new(); cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3"); assert_eq!( - try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), - None, + get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), + RequestTimeoutOption::ClientConfig, ); } #[test] - fn test_get_timeout_from_cmd_returns_none_when_timeout_is_negative() { + fn test_get_timeout_from_cmd_returns_default_timeout_when_timeout_is_negative() { let mut cmd = Cmd::new(); cmd.arg("BLPOP").arg("key1").arg("key2").arg(-1); assert_eq!( - try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), - None, + get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), + RequestTimeoutOption::ClientConfig, ); } #[test] - fn test_get_timeout_from_cmd_returns_duration_without_extension_when_zero_is_passed() { + fn test_get_timeout_from_cmd_returns_no_timeout_when_zero_is_passed() { let mut cmd = Cmd::new(); cmd.arg("BLPOP").arg("key1").arg("key2").arg(0); assert_eq!( - try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), - Some(Duration::from_secs(0)), + get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds), + RequestTimeoutOption::NoTimeout, ); } @@ -612,21 +634,27 @@ mod tests { cmd.arg("BLPOP").arg("key1").arg("key2").arg("500"); assert_eq!( get_request_timeout(&cmd, Duration::from_millis(100)), - Duration::from_secs_f64(500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION) + Some(Duration::from_secs_f64( + 500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION + )) ); let mut cmd = Cmd::new(); cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key"); assert_eq!( get_request_timeout(&cmd, Duration::from_millis(100)), - Duration::from_secs_f64(0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION) + Some(Duration::from_secs_f64( + 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION + )) ); let mut cmd = Cmd::new(); cmd.arg("BLMPOP").arg("0.857").arg("key"); assert_eq!( get_request_timeout(&cmd, Duration::from_millis(100)), - Duration::from_secs_f64(0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION) + Some(Duration::from_secs_f64( + 0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION + )) ); } @@ -636,14 +664,14 @@ mod tests { cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500"); assert_eq!( get_request_timeout(&cmd, Duration::from_millis(100)), - Duration::from_millis(100) + Some(Duration::from_millis(100)) ); let mut cmd = Cmd::new(); cmd.arg("XREADGROUP").arg("key"); assert_eq!( get_request_timeout(&cmd, Duration::from_millis(100)), - Duration::from_millis(100) + Some(Duration::from_millis(100)) ); } } diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index c039d347bd..d79a59c574 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -48,7 +48,7 @@ pub(super) struct ReconnectingConnection { async fn get_multiplexed_connection(client: &redis::Client) -> RedisResult { run_with_timeout( - DEFAULT_CONNECTION_ATTEMPT_TIMEOUT, + Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT), client.get_multiplexed_async_connection(), ) .await