Skip to content

Commit

Permalink
Fixed blocking command to be timed out based on cmd arg
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Apr 18, 2024
1 parent 802230c commit 12b9095
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
* Core: Fixed blocking commands to use the specified timeout from the command argument ([#1283](https://github.com/aws/glide-for-redis/pull/1283))

## 0.3.3 (2024-03-28)

Expand Down
282 changes: 273 additions & 9 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
use redis::RedisResult;
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
use redis::{Cmd, ErrorKind, Value};
use redis::{RedisError, RedisResult};
pub use standalone_client::StandaloneClient;
use std::io;
use std::ops::Deref;
Expand Down Expand Up @@ -95,13 +95,121 @@ pub struct Client {
}

async fn run_with_timeout<T>(
timeout: Duration,
timeout: Option<Duration>,
future: impl futures::Future<Output = RedisResult<T>> + Send,
) -> redis::RedisResult<T> {
tokio::time::timeout(timeout, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res)
match timeout {
Some(duration) => tokio::time::timeout(duration, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res),
None => future.await,
}
}

// Extension to the request timeout for blocking commands to ensure we won't return with timeout error before the server responded
const BLOCKING_CMD_TIMEOUT_EXTENSION: f64 = 0.5; // seconds

enum TimeUnit {
Milliseconds = 1000,
Seconds = 1,
}

// Enumeration representing different request timeout options.
#[derive(Default, PartialEq, Debug)]
enum RequestTimeoutOption {
// Indicates no timeout should be set for the request.
NoTimeout,
// Indicates the request timeout should be based on the client's configured timeout.
#[default]
ClientConfig,
// Indicates the request timeout should be based on the timeout specified in the blocking command.
BlockingCommand(Duration),
}

// Attempts to get the timeout duration from the command argument at `timeout_idx`.
// If the argument can be parsed into a duration, it returns the duration in seconds with BlockingCmdTimeout.
// If the timeout argument value is zero, NoTimeout will be returned. Otherwise, ClientConfigTimeout is returned.
fn get_timeout_from_cmd_arg(
cmd: &Cmd,
timeout_idx: usize,
time_unit: TimeUnit,
) -> RedisResult<RequestTimeoutOption> {
let create_err = |err_msg| {
RedisError::from((
ErrorKind::ResponseError,
err_msg,
format!(
"Expected to find timeout value at index {:?} for command {:?}. Recieved command = {:?}",
timeout_idx,
std::str::from_utf8(&cmd.command().unwrap_or_default()),
std::str::from_utf8(&cmd.get_packed_command())
),
))
};
cmd.arg_idx(timeout_idx)
.ok_or(create_err("Couldn't find timeout index"))
.and_then(|timeout_bytes| {
let timeout_str = std::str::from_utf8(timeout_bytes)
.map_err(|_| create_err("Failed to parse the timeout argument to string"))?;
timeout_str
.parse::<f64>()
.map_err(|_| create_err("Failed to parse the timeout argument to f64"))
.and_then(|timeout| {
let timeout_secs = timeout / ((time_unit as i32) as f64);
if timeout_secs < 0.0 {
// Timeout cannot be negative, return the client's configured request timeout
Err(RedisError::from((
ErrorKind::ResponseError,
"Timeout cannot be negative",
format!("Recieved timeout={:?}", timeout_str),
)))
} else if timeout_secs == 0.0 {
// `0` means we should set no timeout
Ok(RequestTimeoutOption::NoTimeout)
} else {
// We limit the maximum timeout due to restrictions imposed by Redis and the Duration crate
if timeout_secs > u32::MAX as f64 {
Err(RedisError::from((
ErrorKind::ResponseError,
"Timeout is out of range, max timeout is 2^32 - 1 (u32::MAX)",
format!("Recieved timeout={:?}", timeout_secs),
)))
} else {
// Extend the request timeout to ensure we don't timeout before receiving a response from the server.
Ok(RequestTimeoutOption::BlockingCommand(
Duration::from_secs_f64(
(timeout_secs + BLOCKING_CMD_TIMEOUT_EXTENSION)
.min(u32::MAX as f64),
),
))
}
}
})
})
}

fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult<Option<Duration>> {
let command = cmd.command().unwrap_or_default();
let timeout = match command.as_slice() {
b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => {
get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds)
}
b"BLMPOP" | b"BZMPOP" => get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds),
b"XREAD" | b"XREADGROUP" => cmd
.position(b"BLOCK")
.map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds))
.unwrap_or(Ok(RequestTimeoutOption::ClientConfig)),
_ => Ok(RequestTimeoutOption::ClientConfig),
}?;

match timeout {
RequestTimeoutOption::NoTimeout => Ok(None),
RequestTimeoutOption::ClientConfig => Ok(Some(default_timeout)),
RequestTimeoutOption::BlockingCommand(blocking_cmd_duration) => {
Ok(Some(blocking_cmd_duration))
}
}
}

impl Client {
Expand All @@ -111,7 +219,13 @@ impl Client {
routing: Option<RoutingInfo>,
) -> redis::RedisFuture<'a, Value> {
let expected_type = expected_type_for_cmd(cmd);
run_with_timeout(self.request_timeout, async move {
let request_timeout = match get_request_timeout(cmd, self.request_timeout) {
Ok(requet_timeout) => requet_timeout,
Err(err) => {
return async { Err(err) }.boxed();
}
};
run_with_timeout(request_timeout, async move {
match self.internal_client {
ClientWrapper::Standalone(ref mut client) => client.send_command(cmd).await,

Expand Down Expand Up @@ -189,7 +303,7 @@ impl Client {
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let offset = command_count + 1;
run_with_timeout(self.request_timeout, async move {
run_with_timeout(Some(self.request_timeout), async move {
let values = match self.internal_client {
ClientWrapper::Standalone(ref mut client) => {
client.send_pipeline(pipeline, offset, 1).await
Expand Down Expand Up @@ -472,3 +586,153 @@ impl GlideClientForTests for StandaloneClient {
self.send_command(cmd).boxed()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use redis::Cmd;

use crate::client::{
get_request_timeout, RequestTimeoutOption, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION,
};

use super::get_timeout_from_cmd_arg;

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_int() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("5");
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_float() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_milliseconds() {
let mut cmd = Cmd::new();
cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key");
let result = get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_err_when_timeout_isnt_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3");
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_err());
let err = result.unwrap_err();
println!("{:?}", err);
assert!(err.to_string().to_lowercase().contains("index"), "{err}");
}

#[test]
fn test_get_timeout_from_cmd_returns_err_when_timeout_is_larger_than_u32_max() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP")
.arg("key1")
.arg("key2")
.arg(u32::MAX as u64 + 1);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_err());
let err = result.unwrap_err();
println!("{:?}", err);
assert!(err.to_string().to_lowercase().contains("u32"), "{err}");
}

#[test]
fn test_get_timeout_from_cmd_returns_err_when_timeout_is_negative() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(-1);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().to_lowercase().contains("negative"), "{err}");
}

#[test]
fn test_get_timeout_from_cmd_returns_no_timeout_when_zero_is_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_ok());
assert_eq!(result.unwrap(), RequestTimeoutOption::NoTimeout,);
}

#[test]
fn test_get_request_timeout_with_blocking_command_returns_cmd_arg_timeout() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("500");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("BLMPOP").arg("0.857").arg("key");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_request_timeout_non_blocking_command_returns_default_timeout() {
let mut cmd = Cmd::new();
cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(Duration::from_millis(100)));

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("key");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(Duration::from_millis(100)));
}
}
2 changes: 1 addition & 1 deletion glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) struct ReconnectingConnection {

async fn get_multiplexed_connection(client: &redis::Client) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
DEFAULT_CONNECTION_ATTEMPT_TIMEOUT,
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
client.get_multiplexed_async_connection(),
)
.await
Expand Down
Loading

0 comments on commit 12b9095

Please sign in to comment.