Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Apr 18, 2024
1 parent 27cc79e commit 2558dd0
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 90 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
* Core: Fixed blocking commands to use the specified timeout from the command argument
* Core: Fixed blocking commands to use the specified timeout from the command argument ([#1283](https://github.com/aws/glide-for-redis/pull/1283))

## 0.3.3 (2024-03-28)

Expand Down
119 changes: 78 additions & 41 deletions glide-core/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) mod shared_client_tests {
use glide_core::client::{Client, DEFAULT_RESPONSE_TIMEOUT};
use redis::{
cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo},
cluster_topology::get_slot,
FromRedisValue, InfoDict, RedisConnectionInfo, Value,
};
use rstest::rstest;
Expand All @@ -21,6 +22,30 @@ pub(crate) mod shared_client_tests {
client: Client,
}

async fn create_client(server: &BackingServer, configuration: TestConfiguration) -> Client {
match server {
BackingServer::Standalone(server) => {
let connection_addr = server
.as_ref()
.map(|server| server.get_client_addr())
.unwrap_or(get_shared_server_address(configuration.use_tls));

// TODO - this is a patch, handling the situation where the new server
// still isn't available to connection. This should be fixed in [RedisServer].
repeat_try_create(|| async {
Client::new(
create_connection_request(&[connection_addr.clone()], &configuration)
.into(),
)
.await
.ok()
})
.await
}
BackingServer::Cluster(cluster) => create_cluster_client(cluster, configuration).await,
}
}

async fn setup_test_basics(use_cluster: bool, configuration: TestConfiguration) -> TestBasics {
if use_cluster {
let cluster_basics = cluster::setup_test_basics_internal(configuration).await;
Expand All @@ -30,28 +55,9 @@ pub(crate) mod shared_client_tests {
}
} else {
let test_basics = utilities::setup_test_basics_internal(&configuration).await;

let connection_addr = test_basics
.server
.as_ref()
.map(|server| server.get_client_addr())
.unwrap_or(get_shared_server_address(configuration.use_tls));

// TODO - this is a patch, handling the situation where the new server
// still isn't available to connection. This should be fixed in [RedisServer].
let client = repeat_try_create(|| async {
Client::new(
create_connection_request(&[connection_addr.clone()], &configuration).into(),
)
.await
.ok()
})
.await;

TestBasics {
server: BackingServer::Standalone(test_basics.server),
client,
}
let server = BackingServer::Standalone(test_basics.server);
let client = create_client(&server, configuration).await;
TestBasics { server, client }
}
}

Expand Down Expand Up @@ -320,7 +326,7 @@ pub(crate) mod shared_client_tests {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
request_timeout: Some(1), // milliseconds
shared_server: false,
..Default::default()
},
Expand All @@ -331,11 +337,9 @@ pub(crate) mod shared_client_tests {
cmd.arg("EVAL")
.arg(
r#"
local i = 0
while (true)
do
redis.call('ping')
i = i + 1
end
"#,
)
Expand All @@ -359,7 +363,7 @@ pub(crate) mod shared_client_tests {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
request_timeout: Some(1), // milliseconds
shared_server: true,
..Default::default()
},
Expand All @@ -376,15 +380,15 @@ pub(crate) mod shared_client_tests {

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_blocking_command_with_zero_timeout_blocks_indefinitely(
fn test_blocking_command_with_negative_timeout_returns_error(
#[values(false, true)] use_cluster: bool,
) {
// We test that when blocking command is passed with a negative timeout the command will return with an error
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
request_timeout: Some(1), // milliseconds
shared_server: true,
..Default::default()
},
Expand All @@ -403,29 +407,62 @@ pub(crate) mod shared_client_tests {

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_blocking_command_with_negative_timeout_returns_error(
fn test_blocking_command_with_zero_timeout_blocks_indefinitely(
#[values(false, true)] use_cluster: bool,
) {
// We test that when blocking command is passed with a negative timeout the command will return with an error
//"We test that when a blocking command is passed with a timeout duration of 0, it will block the client indefinitely
use redis::cluster_routing::{Route, SingleNodeRoutingInfo, SlotAddr};
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
shared_server: true,
..Default::default()
},
)
.await;
let config = TestConfiguration {
request_timeout: Some(1), // millisecond
shared_server: true,
..Default::default()
};
let mut test_basics = setup_test_basics(use_cluster, config.clone()).await;
let key = "foo";
// We're routing the request to specific node to verify later that BLPOP wasn't completed on that node
let routing = if use_cluster {
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
get_slot(key.as_bytes()),
SlotAddr::Master,
)),
))
} else {
None
};
let get_blpop_cmdstats = |mut client: Client, routing: Option<RoutingInfo>| async move {
let mut cmd = redis::Cmd::new();
cmd.arg("INFO").arg("ALL");
let info_res: Value = client
.send_command(&cmd, routing)
.await
.expect("Failed to get INFO ALL");
let res_str = redis::from_owned_redis_value::<String>(info_res)
.expect("INFO ALL result isn't string");
res_str
.lines()
.filter(|line| line.contains("cmdstat_blpop"))
.collect::<String>()
};
let blpop_stats_before =
get_blpop_cmdstats(test_basics.client.clone(), routing.clone()).await;
let cloned_routing = routing.clone();
let future = async move {
let mut cmd = redis::Cmd::new();
cmd.arg("BLPOP").arg("foo").arg(0); // `0` should block indefinitely
test_basics.client.send_command(&cmd, None).await
cmd.arg("BLPOP").arg(key).arg(0); // `0` should block indefinitely
test_basics.client.send_command(&cmd, cloned_routing).await
};
// We execute the command with Tokio's timeout wrapper to prevent the test from hanging indefinitely.
let tokio_timeout_result =
tokio::time::timeout(DEFAULT_RESPONSE_TIMEOUT * 2, future).await;
assert!(tokio_timeout_result.is_err());
// Verify that BLPOP command wasn't completed.
// We create a new client since the connection in the existing client should be blocked
let new_client = create_client(&test_basics.server, config).await;
let blpop_stats_after = get_blpop_cmdstats(new_client, routing).await;
assert_eq!(blpop_stats_before, blpop_stats_after,
"BLPOP command statistiscs has been changed. Before: {blpop_stats_before}, After: {blpop_stats_after}");
});
}

Expand Down
32 changes: 19 additions & 13 deletions glide-core/tests/utilities/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,10 @@ async fn setup_acl_for_cluster(
join_all(ops).await;
}

pub async fn setup_test_basics_internal(mut configuration: TestConfiguration) -> ClusterTestBasics {
let cluster = if !configuration.shared_server {
Some(RedisCluster::new(
configuration.use_tls,
&configuration.connection_info,
None,
None,
))
} else {
None
};

pub async fn create_cluster_client(
cluster: &Option<RedisCluster>,
mut configuration: TestConfiguration,
) -> Client {
let addresses = if !configuration.shared_server {
cluster.as_ref().unwrap().get_server_addresses()
} else {
Expand All @@ -257,7 +249,21 @@ pub async fn setup_test_basics_internal(mut configuration: TestConfiguration) ->
configuration.request_timeout = configuration.request_timeout.or(Some(10000));
let connection_request = create_connection_request(&addresses, &configuration);

let client = Client::new(connection_request.into()).await.unwrap();
Client::new(connection_request.into()).await.unwrap()
}

pub async fn setup_test_basics_internal(configuration: TestConfiguration) -> ClusterTestBasics {
let cluster = if !configuration.shared_server {
Some(RedisCluster::new(
configuration.use_tls,
&configuration.connection_info,
None,
None,
))
} else {
None
};
let client = create_cluster_client(&cluster, configuration).await;
ClusterTestBasics { cluster, client }
}

Expand Down
4 changes: 2 additions & 2 deletions glide-core/tests/utilities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ pub async fn setup_acl(addr: &ConnectionAddr, connection_info: &RedisConnectionI
connection.send_packed_command(&cmd).await.unwrap();
}

#[derive(Eq, PartialEq, Default)]
#[derive(Eq, PartialEq, Default, Clone)]
pub enum ClusterMode {
#[default]
Disabled,
Expand Down Expand Up @@ -652,7 +652,7 @@ pub fn create_connection_request(
connection_request
}

#[derive(Default)]
#[derive(Default, Clone)]
pub struct TestConfiguration {
pub use_tls: bool,
pub connection_retry_strategy: Option<connection_request::ConnectionRetryStrategy>,
Expand Down
3 changes: 0 additions & 3 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ async def create_client(
addresses: Optional[List[NodeAddress]] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
request_timeout: Optional[int] = None,
) -> Union[RedisClient, RedisClusterClient]:
# Create async socket client
use_tls = request.config.getoption("--tls")
Expand All @@ -125,7 +124,6 @@ async def create_client(
credentials=credentials,
client_name=client_name,
protocol=protocol,
request_timeout=request_timeout,
)
return await RedisClusterClient.create(cluster_config)
else:
Expand All @@ -139,6 +137,5 @@ async def create_client(
database_id=database_id,
client_name=client_name,
protocol=protocol,
request_timeout=request_timeout,
)
return await RedisClient.create(config)
30 changes: 0 additions & 30 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1938,36 +1938,6 @@ async def test_cluster_fail_routing_by_address_if_no_port_is_provided(
await redis_client.info(route=ByAddressRoute("foo"))


@pytest.mark.asyncio
class TestExceptions:
@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_timeout_exception_with_long_running_lua(
self, redis_client: TRedisClient, request
):
# Create a client with minimal request timeout
is_cluster = isinstance(redis_client, RedisClusterClient)
new_client = await create_client(
request,
is_cluster,
addresses=redis_client.config.addresses,
request_timeout=1,
)
script = Script(
"""
local i = 0
while (i < 100000)
do
redis.call('ping')
i = i + 1
end
"""
)
with pytest.raises(TimeoutError):
await new_client.invoke_script(script, keys=[], args=[])
await new_client.close()


@pytest.mark.asyncio
class TestScripts:
@pytest.mark.smoke_test
Expand Down

0 comments on commit 2558dd0

Please sign in to comment.