Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed blocking command to be timed out based on the specified command argument #1283

Merged
merged 3 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
* Core: Fixed blocking commands to use the specified timeout from the command argument ([#1283](https://github.com/aws/glide-for-redis/pull/1283))

## 0.3.3 (2024-03-28)

Expand Down
282 changes: 273 additions & 9 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
use redis::RedisResult;
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
use redis::{Cmd, ErrorKind, Value};
use redis::{RedisError, RedisResult};
pub use standalone_client::StandaloneClient;
use std::io;
use std::ops::Deref;
Expand Down Expand Up @@ -95,13 +95,121 @@ pub struct Client {
}

async fn run_with_timeout<T>(
timeout: Duration,
timeout: Option<Duration>,
future: impl futures::Future<Output = RedisResult<T>> + Send,
) -> redis::RedisResult<T> {
tokio::time::timeout(timeout, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res)
match timeout {
Some(duration) => tokio::time::timeout(duration, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res),
None => future.await,
}
}

// Extension to the request timeout for blocking commands to ensure we won't return with timeout error before the server responded
barshaul marked this conversation as resolved.
Show resolved Hide resolved
const BLOCKING_CMD_TIMEOUT_EXTENSION: f64 = 0.5; // seconds
jonathanl-bq marked this conversation as resolved.
Show resolved Hide resolved

enum TimeUnit {
Milliseconds = 1000,
Seconds = 1,
}

// Enumeration representing different request timeout options.
#[derive(Default, PartialEq, Debug)]
enum RequestTimeoutOption {
// Indicates no timeout should be set for the request.
NoTimeout,
// Indicates the request timeout should be based on the client's configured timeout.
#[default]
ClientConfig,
// Indicates the request timeout should be based on the timeout specified in the blocking command.
BlockingCommand(Duration),
}

// Attempts to get the timeout duration from the command argument at `timeout_idx`.
// If the argument can be parsed into a duration, it returns the duration in seconds with BlockingCmdTimeout.
// If the timeout argument value is zero, NoTimeout will be returned. Otherwise, ClientConfigTimeout is returned.
fn get_timeout_from_cmd_arg(
barshaul marked this conversation as resolved.
Show resolved Hide resolved
cmd: &Cmd,
timeout_idx: usize,
time_unit: TimeUnit,
) -> RedisResult<RequestTimeoutOption> {
let create_err = |err_msg| {
RedisError::from((
ErrorKind::ResponseError,
err_msg,
format!(
"Expected to find timeout value at index {:?} for command {:?}. Recieved command = {:?}",
timeout_idx,
std::str::from_utf8(&cmd.command().unwrap_or_default()),
std::str::from_utf8(&cmd.get_packed_command())
),
))
};
cmd.arg_idx(timeout_idx)
.ok_or(create_err("Couldn't find timeout index"))
.and_then(|timeout_bytes| {
barshaul marked this conversation as resolved.
Show resolved Hide resolved
let timeout_str = std::str::from_utf8(timeout_bytes)
.map_err(|_| create_err("Failed to parse the timeout argument to string"))?;
timeout_str
.parse::<f64>()
.map_err(|_| create_err("Failed to parse the timeout argument to f64"))
.and_then(|timeout| {
let timeout_secs = timeout / ((time_unit as i32) as f64);
if timeout_secs < 0.0 {
// Timeout cannot be negative, return the client's configured request timeout
Err(RedisError::from((
ErrorKind::ResponseError,
"Timeout cannot be negative",
format!("Recieved timeout={:?}", timeout_str),
)))
} else if timeout_secs == 0.0 {
// `0` means we should set no timeout
Ok(RequestTimeoutOption::NoTimeout)
} else {
// We limit the maximum timeout due to restrictions imposed by Redis and the Duration crate
if timeout_secs > u32::MAX as f64 {
Err(RedisError::from((
ErrorKind::ResponseError,
"Timeout is out of range, max timeout is 2^32 - 1 (u32::MAX)",
format!("Recieved timeout={:?}", timeout_secs),
)))
} else {
// Extend the request timeout to ensure we don't timeout before receiving a response from the server.
Ok(RequestTimeoutOption::BlockingCommand(
Duration::from_secs_f64(
(timeout_secs + BLOCKING_CMD_TIMEOUT_EXTENSION)
.min(u32::MAX as f64),
),
))
}
}
})
})
}

fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult<Option<Duration>> {
let command = cmd.command().unwrap_or_default();
let timeout = match command.as_slice() {
b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has potential to miss the future commands having timeouts - we should add reference to this function to the "integrating new-commands" SOP.
In addition we should have an automatic test like this (for redis):

  1. pull the target server commands.json
  2. detect commands with the timeout param
  3. check we handle each command correctly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ikolomi Can you create an issue for that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds)
}
b"BLMPOP" | b"BZMPOP" => get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds),
b"XREAD" | b"XREADGROUP" => cmd
.position(b"BLOCK")
.map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds))
.unwrap_or(Ok(RequestTimeoutOption::ClientConfig)),
_ => Ok(RequestTimeoutOption::ClientConfig),
}?;

match timeout {
RequestTimeoutOption::NoTimeout => Ok(None),
RequestTimeoutOption::ClientConfig => Ok(Some(default_timeout)),
RequestTimeoutOption::BlockingCommand(blocking_cmd_duration) => {
Ok(Some(blocking_cmd_duration))
}
}
}

impl Client {
Expand All @@ -111,7 +219,13 @@ impl Client {
routing: Option<RoutingInfo>,
) -> redis::RedisFuture<'a, Value> {
let expected_type = expected_type_for_cmd(cmd);
run_with_timeout(self.request_timeout, async move {
let request_timeout = match get_request_timeout(cmd, self.request_timeout) {
Ok(requet_timeout) => requet_timeout,
barshaul marked this conversation as resolved.
Show resolved Hide resolved
Err(err) => {
return async { Err(err) }.boxed();
}
};
run_with_timeout(request_timeout, async move {
match self.internal_client {
ClientWrapper::Standalone(ref mut client) => client.send_command(cmd).await,

Expand Down Expand Up @@ -189,7 +303,7 @@ impl Client {
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let offset = command_count + 1;
run_with_timeout(self.request_timeout, async move {
run_with_timeout(Some(self.request_timeout), async move {
let values = match self.internal_client {
ClientWrapper::Standalone(ref mut client) => {
client.send_pipeline(pipeline, offset, 1).await
Expand Down Expand Up @@ -472,3 +586,153 @@ impl GlideClientForTests for StandaloneClient {
self.send_command(cmd).boxed()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use redis::Cmd;

use crate::client::{
get_request_timeout, RequestTimeoutOption, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION,
};

use super::get_timeout_from_cmd_arg;

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_int() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("5");
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_float() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_milliseconds() {
let mut cmd = Cmd::new();
cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key");
let result = get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds);
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_err_when_timeout_isnt_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3");
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_err());
let err = result.unwrap_err();
println!("{:?}", err);
assert!(err.to_string().to_lowercase().contains("index"), "{err}");
}

#[test]
fn test_get_timeout_from_cmd_returns_err_when_timeout_is_larger_than_u32_max() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP")
.arg("key1")
.arg("key2")
.arg(u32::MAX as u64 + 1);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_err());
let err = result.unwrap_err();
println!("{:?}", err);
assert!(err.to_string().to_lowercase().contains("u32"), "{err}");
}

#[test]
fn test_get_timeout_from_cmd_returns_err_when_timeout_is_negative() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(-1);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().to_lowercase().contains("negative"), "{err}");
}

#[test]
fn test_get_timeout_from_cmd_returns_no_timeout_when_zero_is_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0);
let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds);
assert!(result.is_ok());
assert_eq!(result.unwrap(), RequestTimeoutOption::NoTimeout,);
}

#[test]
fn test_get_request_timeout_with_blocking_command_returns_cmd_arg_timeout() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("500");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("BLMPOP").arg("0.857").arg("key");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_request_timeout_non_blocking_command_returns_default_timeout() {
let mut cmd = Cmd::new();
cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(Duration::from_millis(100)));

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("key");
let result = get_request_timeout(&cmd, Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(Duration::from_millis(100)));
}
}
2 changes: 1 addition & 1 deletion glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) struct ReconnectingConnection {

async fn get_multiplexed_connection(client: &redis::Client) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
DEFAULT_CONNECTION_ATTEMPT_TIMEOUT,
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
client.get_multiplexed_async_connection(),
)
.await
Expand Down
Loading
Loading