diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f4421333d..1f8ebb2985 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ #### Fixes * Core: Fix RESP2 multi-node response from cluster ([#2381](https://github.com/valkey-io/valkey-glide/pull/2381)) +* Core: Ensure cluster client creation fail when engine is < 7.0 and sharded subscriptions are configured ([#2819](https://github.com/valkey-io/valkey-glide/pull/2819)) #### Operational Enhancements diff --git a/glide-core/Cargo.toml b/glide-core/Cargo.toml index bd12bb09c9..394c953f49 100644 --- a/glide-core/Cargo.toml +++ b/glide-core/Cargo.toml @@ -41,6 +41,7 @@ nanoid = "0.4.0" async-trait = { version = "0.1.24" } serde_json = "1" serde = { version = "1", features = ["derive"] } +versions = "6.3" [features] socket-layer = [ diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 29a3a05393..a560e17697 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -12,7 +12,10 @@ use redis::cluster_routing::{ MultipleNodeRoutingInfo, ResponsePolicy, Routable, RoutingInfo, SingleNodeRoutingInfo, }; use redis::cluster_slotmap::ReadFromReplicaStrategy; -use redis::{Cmd, ErrorKind, ObjectType, PushInfo, RedisError, RedisResult, ScanStateRC, Value}; +use redis::{ + Cmd, ErrorKind, FromRedisValue, InfoDict, ObjectType, PushInfo, RedisError, RedisResult, + ScanStateRC, Value, +}; pub use standalone_client::StandaloneClient; use std::io; use std::sync::atomic::{AtomicIsize, Ordering}; @@ -25,6 +28,7 @@ mod reconnecting_connection; mod standalone_client; mod value_conversion; use tokio::sync::mpsc; +use versions::Versioning; pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1); pub const DEFAULT_RETRIES: u32 = 3; @@ -607,7 +611,7 @@ async fn create_cluster_client( }; builder = builder.tls(tls); } - if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions { + if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions.clone() { builder = builder.pubsub_subscriptions(pubsub_subscriptions); } @@ -615,7 +619,55 @@ async fn create_cluster_client( builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL); let client = builder.build()?; - client.get_async_connection(push_sender).await + let mut con = client.get_async_connection(push_sender).await?; + + // This validation ensures that sharded subscriptions are not applied to Redis engines older than version 7.0, + // preventing scenarios where the client becomes inoperable or, worse, unaware that sharded pubsub messages are not being received. + // The issue arises because `client.get_async_connection()` might succeed even if the engine does not support sharded pubsub. + // For example, initial connections may exclude the target node for sharded subscriptions, allowing the creation to succeed, + // but subsequent resubscription tasks will fail when `setup_connection()` cannot establish a connection to the node. + // + // One approach to handle this would be to check the engine version inside `setup_connection()` and skip applying sharded subscriptions. + // However, this approach would leave the application unaware that the subscriptions were not applied, requiring the user to analyze logs to identify the issue. + // Instead, we explicitly check the engine version here and fail the connection creation if it is incompatible with sharded subscriptions. + + if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions { + if pubsub_subscriptions.contains_key(&redis::PubSubSubscriptionKind::Sharded) { + let info_res = con + .route_command( + redis::cmd("INFO").arg("SERVER"), + RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), + ) + .await?; + let info_dict: InfoDict = FromRedisValue::from_redis_value(&info_res)?; + match info_dict.get::("redis_version") { + Some(version) => match (Versioning::new(version), Versioning::new("7.0")) { + (Some(server_ver), Some(min_ver)) => { + if server_ver < min_ver { + return Err(RedisError::from(( + ErrorKind::InvalidClientConfig, + "Sharded subscriptions provided, but the engine version is < 7.0", + ))); + } + } + _ => { + return Err(RedisError::from(( + ErrorKind::ResponseError, + "Failed to parse engine version", + ))) + } + }, + _ => { + return Err(RedisError::from(( + ErrorKind::ResponseError, + "Could not determine engine version from INFO result", + ))) + } + } + } + } + + Ok(con) } #[derive(thiserror::Error)] diff --git a/glide-core/tests/test_cluster_client.rs b/glide-core/tests/test_cluster_client.rs index 2943ab21bb..ec795481c6 100644 --- a/glide-core/tests/test_cluster_client.rs +++ b/glide-core/tests/test_cluster_client.rs @@ -7,13 +7,19 @@ mod cluster_client_tests { use std::collections::HashMap; use super::*; - use glide_core::connection_request::ReadFrom; + use cluster::{setup_cluster_with_replicas, LONG_CLUSTER_TEST_TIMEOUT}; + use glide_core::client::Client; + use glide_core::connection_request::{ + self, PubSubChannelsOrPatterns, PubSubSubscriptions, ReadFrom, + }; use redis::cluster_routing::{ MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, }; + use redis::InfoDict; use rstest::rstest; use utilities::cluster::{setup_test_basics_internal, SHORT_CLUSTER_TEST_TIMEOUT}; use utilities::*; + use versions::Versioning; fn count_primary_or_replica(value: &str) -> (u16, u16) { if value.contains("role:master") { @@ -214,4 +220,85 @@ mod cluster_client_tests { assert_eq!(replicas, 1); }); } + + #[rstest] + #[timeout(LONG_CLUSTER_TEST_TIMEOUT)] + fn test_fail_creation_with_unsupported_sharded_pubsub() { + block_on_all(async { + let mut test_basics = setup_cluster_with_replicas( + TestConfiguration { + cluster_mode: ClusterMode::Enabled, + shared_server: false, + ..Default::default() + }, + 0, + 3, + ) + .await; + + // get engine version + let cmd = redis::cmd("INFO"); + let info = test_basics + .client + .send_command( + &cmd, + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)), + ) + .await + .unwrap(); + + let info_dict: InfoDict = redis::from_owned_redis_value(info).unwrap(); + match info_dict.get::("redis_version") { + Some(version) => match (Versioning::new(version), Versioning::new("7.0")) { + (Some(server_ver), Some(min_ver)) => { + if server_ver < min_ver { + // try to create client with initial nodes lacking the target sharded subscription node + let cluster = test_basics.cluster.unwrap(); + let mut addresses = cluster.get_server_addresses(); + addresses.truncate(1); + + let mut connection_request = + connection_request::ConnectionRequest::new(); + connection_request.addresses = + addresses.iter().map(get_address_info).collect(); + + connection_request.cluster_mode_enabled = true; + // Assumes the current implementation of the test cluster, where slots are distributed across nodes + // in a monotonically increasing order. + let mut last_slot_channel = PubSubChannelsOrPatterns::new(); + last_slot_channel + .channels_or_patterns + .push("last-slot-channel-{16383}".as_bytes().into()); + + let mut subs = PubSubSubscriptions::new(); + // First try to create a client with the Exact subscription + subs.channels_or_patterns_by_type + .insert(0, last_slot_channel.clone()); + connection_request.pubsub_subscriptions = + protobuf::MessageField::from_option(Some(subs.clone())); + + let _client = Client::new(connection_request.clone().into(), None) + .await + .unwrap(); + + // Now try to create a client with a Sharded subscription which should fail + subs.channels_or_patterns_by_type + .insert(2, last_slot_channel); + connection_request.pubsub_subscriptions = + protobuf::MessageField::from_option(Some(subs)); + + let client = Client::new(connection_request.into(), None).await; + assert!(client.is_err()); + } + } + _ => { + panic!("Failed to parse engine version"); + } + }, + _ => { + panic!("Could not determine engine version from INFO result"); + } + } + }); + } }