Skip to content

Commit

Permalink
Introduce engine version validation when sharded subscriptions are co…
Browse files Browse the repository at this point in the history
…nfigured

Signed-off-by: ikolomi <ikolomin@amazon.com>
  • Loading branch information
ikolomi committed Dec 17, 2024
1 parent 79c3bc9 commit a6e76f8
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#### Fixes
* Core: Fix RESP2 multi-node response from cluster ([#2381](https://github.com/valkey-io/valkey-glide/pull/2381))
* Core: Ensure cluster client creation fail when engine is < 7.0 and sharded subscriptions are configured ([#2819](https://github.com/valkey-io/valkey-glide/pull/2819))

#### Operational Enhancements

Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nanoid = "0.4.0"
async-trait = { version = "0.1.24" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
versions = "6.3"

[features]
socket-layer = [
Expand Down
58 changes: 55 additions & 3 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use redis::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, Routable, RoutingInfo, SingleNodeRoutingInfo,
};
use redis::cluster_slotmap::ReadFromReplicaStrategy;
use redis::{Cmd, ErrorKind, ObjectType, PushInfo, RedisError, RedisResult, ScanStateRC, Value};
use redis::{
Cmd, ErrorKind, FromRedisValue, InfoDict, ObjectType, PushInfo, RedisError, RedisResult,
ScanStateRC, Value,
};
pub use standalone_client::StandaloneClient;
use std::io;
use std::sync::atomic::{AtomicIsize, Ordering};
Expand All @@ -25,6 +28,7 @@ mod reconnecting_connection;
mod standalone_client;
mod value_conversion;
use tokio::sync::mpsc;
use versions::Versioning;

pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;
Expand Down Expand Up @@ -607,15 +611,63 @@ async fn create_cluster_client(
};
builder = builder.tls(tls);
}
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions.clone() {
builder = builder.pubsub_subscriptions(pubsub_subscriptions);
}

// Always use with Glide
builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL);

let client = builder.build()?;
client.get_async_connection(push_sender).await
let mut con = client.get_async_connection(push_sender).await?;

// This validation ensures that sharded subscriptions are not applied to Redis engines older than version 7.0,
// preventing scenarios where the client becomes inoperable or, worse, unaware that sharded pubsub messages are not being received.
// The issue arises because `client.get_async_connection()` might succeed even if the engine does not support sharded pubsub.
// For example, initial connections may exclude the target node for sharded subscriptions, allowing the creation to succeed,
// but subsequent resubscription tasks will fail when `setup_connection()` cannot establish a connection to the node.
//
// One approach to handle this would be to check the engine version inside `setup_connection()` and skip applying sharded subscriptions.
// However, this approach would leave the application unaware that the subscriptions were not applied, requiring the user to analyze logs to identify the issue.
// Instead, we explicitly check the engine version here and fail the connection creation if it is incompatible with sharded subscriptions.

if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if pubsub_subscriptions.contains_key(&redis::PubSubSubscriptionKind::Sharded) {
let info_res = con
.route_command(
redis::cmd("INFO").arg("SERVER"),
RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random),
)
.await?;
let info_dict: InfoDict = FromRedisValue::from_redis_value(&info_res)?;
match info_dict.get::<String>("redis_version") {
Some(version) => match (Versioning::new(version), Versioning::new("7.0")) {
(Some(server_ver), Some(min_ver)) => {
if server_ver < min_ver {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Sharded subscriptions provided, but the engine version is < 7.0",
)));
}
}
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to parse engine version",
)))
}
},
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Could not determine engine version from INFO result",
)))
}
}
}
}

Ok(con)
}

#[derive(thiserror::Error)]
Expand Down
89 changes: 88 additions & 1 deletion glide-core/tests/test_cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ mod cluster_client_tests {
use std::collections::HashMap;

use super::*;
use glide_core::connection_request::ReadFrom;
use cluster::{setup_cluster_with_replicas, LONG_CLUSTER_TEST_TIMEOUT};
use glide_core::client::Client;
use glide_core::connection_request::{
self, PubSubChannelsOrPatterns, PubSubSubscriptions, ReadFrom,
};
use redis::cluster_routing::{
MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
};
use redis::InfoDict;
use rstest::rstest;
use utilities::cluster::{setup_test_basics_internal, SHORT_CLUSTER_TEST_TIMEOUT};
use utilities::*;
use versions::Versioning;

fn count_primary_or_replica(value: &str) -> (u16, u16) {
if value.contains("role:master") {
Expand Down Expand Up @@ -214,4 +220,85 @@ mod cluster_client_tests {
assert_eq!(replicas, 1);
});
}

#[rstest]
#[timeout(LONG_CLUSTER_TEST_TIMEOUT)]
fn test_fail_creation_with_unsupported_sharded_pubsub() {
block_on_all(async {
let mut test_basics = setup_cluster_with_replicas(
TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: false,
..Default::default()
},
0,
3,
)
.await;

// get engine version
let cmd = redis::cmd("INFO");
let info = test_basics
.client
.send_command(
&cmd,
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
)
.await
.unwrap();

let info_dict: InfoDict = redis::from_owned_redis_value(info).unwrap();
match info_dict.get::<String>("redis_version") {
Some(version) => match (Versioning::new(version), Versioning::new("7.0")) {
(Some(server_ver), Some(min_ver)) => {
if server_ver < min_ver {
// try to create client with initial nodes lacking the target sharded subscription node
let cluster = test_basics.cluster.unwrap();
let mut addresses = cluster.get_server_addresses();
addresses.truncate(1);

let mut connection_request =
connection_request::ConnectionRequest::new();
connection_request.addresses =
addresses.iter().map(get_address_info).collect();

connection_request.cluster_mode_enabled = true;
// Assumes the current implementation of the test cluster, where slots are distributed across nodes
// in a monotonically increasing order.
let mut last_slot_channel = PubSubChannelsOrPatterns::new();
last_slot_channel
.channels_or_patterns
.push("last-slot-channel-{16383}".as_bytes().into());

let mut subs = PubSubSubscriptions::new();
// First try to create a client with the Exact subscription
subs.channels_or_patterns_by_type
.insert(0, last_slot_channel.clone());
connection_request.pubsub_subscriptions =
protobuf::MessageField::from_option(Some(subs.clone()));

let _client = Client::new(connection_request.clone().into(), None)
.await
.unwrap();

// Now try to create a client with a Sharded subscription which should fail
subs.channels_or_patterns_by_type
.insert(2, last_slot_channel);
connection_request.pubsub_subscriptions =
protobuf::MessageField::from_option(Some(subs));

let client = Client::new(connection_request.into(), None).await;
assert!(client.is_err());
}
}
_ => {
panic!("Failed to parse engine version");
}
},
_ => {
panic!("Could not determine engine version from INFO result");
}
}
});
}
}

0 comments on commit a6e76f8

Please sign in to comment.