Skip to content

Commit

Permalink
Fix blocking commands tests in wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Apr 16, 2024
1 parent 802230c commit 26cbe61
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
* Core: Fixed blocking commands to use the specified timeout from the command argument

## 0.3.3 (2024-03-28)

Expand Down
167 changes: 161 additions & 6 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
use redis::RedisResult;
use redis::{Cmd, ErrorKind, Value};
pub use standalone_client::StandaloneClient;
Expand Down Expand Up @@ -98,10 +98,67 @@ async fn run_with_timeout<T>(
timeout: Duration,
future: impl futures::Future<Output = RedisResult<T>> + Send,
) -> redis::RedisResult<T> {
tokio::time::timeout(timeout, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res)
if timeout == Duration::from_secs(0) {
// run without timeout
future.await
} else {
// run with timeout
tokio::time::timeout(timeout, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res)
}
}

// Extension to the request timeout for blocking commands to ensure we won't return with timeout error before the server responded
const BLOCKING_CMD_TIMEOUT_EXTENSION: f64 = 0.5; // seconds

enum TimeUnit {
Milliseconds = 1000,
Seconds = 1,
}

// Attempts to get the timeout duration from the command argument at `timeout_idx`.
// If the argument can be parsed into a duration, it returns the duration in seconds. Otherwise, it returns None.
fn try_get_timeout_from_cmd_arg(
cmd: &Cmd,
timeout_idx: usize,
time_unit: TimeUnit,
) -> Option<Duration> {
cmd.arg_idx(timeout_idx).and_then(|timeout_bytes| {
std::str::from_utf8(timeout_bytes)
.ok()
.and_then(|timeout_str| {
timeout_str.parse::<f64>().ok().map(|timeout| {
let mut timeout_secs = timeout / ((time_unit as i32) as f64);
if timeout_secs < 0.0 {
// Timeout cannot be negative, return None so the default request timeout will be used and the server will response with error
return None;
} else if timeout_secs > 0.0 {
// Extend the request timeout to ensure we don't timeout before receiving a response from the server.
timeout_secs += BLOCKING_CMD_TIMEOUT_EXTENSION;
};
Some(Duration::from_secs_f64(timeout_secs))
})
})
.unwrap_or(None)
})
}

fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> Duration {
let command = cmd.command().unwrap_or_default();
let blocking_timeout = match command.as_slice() {
b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => {
try_get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds)
}
b"BLMPOP" | b"BZMPOP" => try_get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds),
b"XREAD" | b"XREADGROUP" => cmd
.position(b"BLOCK")
.and_then(|idx| try_get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds)),
_ => None,
};

blocking_timeout.unwrap_or(default_timeout)
}

impl Client {
Expand All @@ -111,7 +168,7 @@ impl Client {
routing: Option<RoutingInfo>,
) -> redis::RedisFuture<'a, Value> {
let expected_type = expected_type_for_cmd(cmd);
run_with_timeout(self.request_timeout, async move {
run_with_timeout(get_request_timeout(cmd, self.request_timeout), async move {
match self.internal_client {
ClientWrapper::Standalone(ref mut client) => client.send_command(cmd).await,

Expand Down Expand Up @@ -472,3 +529,101 @@ impl GlideClientForTests for StandaloneClient {
self.send_command(cmd).boxed()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use redis::Cmd;

use crate::client::{get_request_timeout, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION};

use super::try_get_timeout_from_cmd_arg;

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_int() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("5");
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
Some(Duration::from_secs_f64(
5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_float() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5);
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_correct_duration_milliseconds() {
let mut cmd = Cmd::new();
cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key");
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds),
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
fn test_get_timeout_from_cmd_returns_none_when_timeout_isnt_passed() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3");
assert_eq!(
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
None,
);
}

#[test]
fn test_get_request_timeout_with_blocking_command_returns_cmd_arg_timeout() {
let mut cmd = Cmd::new();
cmd.arg("BLPOP").arg("key1").arg("key2").arg("500");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_secs_f64(500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION)
);

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_secs_f64(0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION)
);

let mut cmd = Cmd::new();
cmd.arg("BLMPOP").arg("0.857").arg("key");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_secs_f64(0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION)
);
}

#[test]
fn test_get_request_timeout_non_blocking_command_returns_default_timeout() {
let mut cmd = Cmd::new();
cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_millis(100)
);

let mut cmd = Cmd::new();
cmd.arg("XREADGROUP").arg("key");
assert_eq!(
get_request_timeout(&cmd, Duration::from_millis(100)),
Duration::from_millis(100)
);
}
}
99 changes: 96 additions & 3 deletions glide-core/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod utilities;
#[cfg(test)]
pub(crate) mod shared_client_tests {
use super::*;
use glide_core::client::Client;
use glide_core::client::{Client, DEFAULT_RESPONSE_TIMEOUT};
use redis::{
cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo},
FromRedisValue, InfoDict, RedisConnectionInfo, Value,
Expand Down Expand Up @@ -316,6 +316,45 @@ pub(crate) mod shared_client_tests {
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_request_timeout(#[values(false, true)] use_cluster: bool) {
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
shared_server: false,
..Default::default()
},
)
.await;
let mut cmd = redis::Cmd::new();
// Create a long running command to ensure we get into timeout
cmd.arg("EVAL")
.arg(
r#"
local i = 0
while (true)
do
redis.call('ping')
i = i + 1
end
"#,
)
.arg("0");
let result = test_basics.client.send_command(&cmd, None).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_timeout(), "{err}");
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_blocking_command_doesnt_raise_timeout_error(#[values(false, true)] use_cluster: bool) {
// We test that the request timeout is based on the value specified in the blocking command argument,
// and not on the one set in the client configuration. To achieve this, we execute a command designed to
// be blocked until it reaches the specified command timeout. We set the client's request timeout to
// a shorter duration than the blocking command's timeout. Subsequently, we confirm that we receive
// a response from the server instead of encountering a timeout error.
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
Expand All @@ -328,11 +367,65 @@ pub(crate) mod shared_client_tests {
.await;

let mut cmd = redis::Cmd::new();
cmd.arg("BLPOP").arg("foo").arg(0); // 0 timeout blocks indefinitely
cmd.arg("BLPOP").arg("foo").arg(0.3); // server should return null after 300 millisecond
let result = test_basics.client.send_command(&cmd, None).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Value::Nil);
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_blocking_command_with_zero_timeout_blocks_indefinitely(
#[values(false, true)] use_cluster: bool,
) {
// We test that when blocking command is passed with 0 as the timeout duration, the command will be blocked indefinitely
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
shared_server: true,
..Default::default()
},
)
.await;
let mut cmd = redis::Cmd::new();
cmd.arg("BLPOP").arg("foo").arg(-1);
let result = test_basics.client.send_command(&cmd, None).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_timeout(), "{err}");
println!("{:?}", err);
assert_eq!(err.kind(), redis::ErrorKind::ResponseError);
assert!(err.to_string().contains("timeout is negative"));
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_blocking_command_with_negative_timeout_returns_error(
#[values(false, true)] use_cluster: bool,
) {
// We test that when blocking command is passed with a negative timeout the command will return with an error
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
shared_server: true,
..Default::default()
},
)
.await;
let future = async move {
let mut cmd = redis::Cmd::new();
cmd.arg("BLPOP").arg("foo").arg(0); // `0` should block indefinitely
test_basics.client.send_command(&cmd, None).await
};
// We execute the command with Tokio's timeout wrapper to prevent the test from hanging indefinitely.
let tokio_timeout_result =
tokio::time::timeout(DEFAULT_RESPONSE_TIMEOUT * 2, future).await;
assert!(tokio_timeout_result.is_err());
});
}

Expand Down
14 changes: 4 additions & 10 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1813,15 +1813,12 @@ public void brpop(BaseClient client) {

// nothing popped out
assertNull(
client
.brpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.001)
.get());
client.brpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.5).get());

// Key exists, but it is not a list
assertEquals(OK, client.set("foo", "bar").get());
ExecutionException executionException =
assertThrows(
ExecutionException.class, () -> client.brpop(new String[] {"foo"}, .0001).get());
assertThrows(ExecutionException.class, () -> client.brpop(new String[] {"foo"}, 0.5).get());
assertTrue(executionException.getCause() instanceof RequestException);
}

Expand Down Expand Up @@ -1866,15 +1863,12 @@ public void blpop(BaseClient client) {

// nothing popped out
assertNull(
client
.blpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.001)
.get());
client.blpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.5).get());

// Key exists, but it is not a list
assertEquals(OK, client.set("foo", "bar").get());
ExecutionException executionException =
assertThrows(
ExecutionException.class, () -> client.blpop(new String[] {"foo"}, .0001).get());
assertThrows(ExecutionException.class, () -> client.blpop(new String[] {"foo"}, 0.5).get());
assertTrue(executionException.getCause() instanceof RequestException);
}

Expand Down
4 changes: 2 additions & 2 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2041,14 +2041,14 @@ export function runBaseTests<Context>(config: {
await client.rpush("brpop-test", ["foo", "bar", "baz"]),
).toEqual(3);
// Test basic usage
expect(await client.brpop(["brpop-test"], 0.1)).toEqual([
expect(await client.brpop(["brpop-test"], 0.5)).toEqual([
"brpop-test",
"baz",
]);
// Delete all values from list
expect(await client.del(["brpop-test"])).toEqual(1);
// Test null return when key doesn't exist
expect(await client.brpop(["brpop-test"], 0.1)).toEqual(null);
expect(await client.brpop(["brpop-test"], 0.5)).toEqual(null);
}, protocol);
},
config.timeout,
Expand Down
3 changes: 3 additions & 0 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async def create_client(
addresses: Optional[List[NodeAddress]] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
request_timeout: Optional[int] = None,
) -> Union[RedisClient, RedisClusterClient]:
# Create async socket client
use_tls = request.config.getoption("--tls")
Expand All @@ -124,6 +125,7 @@ async def create_client(
credentials=credentials,
client_name=client_name,
protocol=protocol,
request_timeout=request_timeout,
)
return await RedisClusterClient.create(cluster_config)
else:
Expand All @@ -137,5 +139,6 @@ async def create_client(
database_id=database_id,
client_name=client_name,
protocol=protocol,
request_timeout=request_timeout,
)
return await RedisClient.create(config)
Loading

0 comments on commit 26cbe61

Please sign in to comment.