From bf2cbe26ffccfcb3559468170d5b7cc2380fd531 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Sun, 5 Jan 2025 09:41:28 +0000 Subject: [PATCH 01/17] [redis-rs][core] Move connection refresh to the background Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 30 ++- .../redis-rs/redis/src/cluster_async/mod.rs | 243 +++++++++++------- 2 files changed, 173 insertions(+), 100 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 955d24d9e9..7de66f59cf 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -2,7 +2,7 @@ use crate::cluster_async::ConnectionFuture; use crate::cluster_routing::{Route, ShardAddrs, SlotAddr}; use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; use crate::cluster_topology::TopologyHash; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use futures::FutureExt; use rand::seq::IteratorRandom; use std::net::IpAddr; @@ -10,6 +10,8 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use telemetrylib::Telemetry; +use tokio::task::JoinHandle; + /// Count the number of connections in a connections_map object macro_rules! count_connections { ($conn_map:expr) => {{ @@ -121,6 +123,12 @@ pub(crate) enum ConnectionType { pub(crate) struct ConnectionsMap(pub(crate) DashMap>); +pub(crate) struct RefreshState { + pub handle: JoinHandle<()>, // The currect running refresh task + pub node_conn: Option> // The refreshed connection after the task is done +} + + impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for item in self.0.iter() { @@ -139,6 +147,14 @@ pub(crate) struct ConnectionsContainer { pub(crate) slot_map: SlotMap, read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, + + + // Holds all the failed addresses that started a refresh task. + pub(crate) refresh_addresses_started: DashSet, + // Follow the refresh ops on the connections + pub(crate) refresh_operations: DashMap>, + // Holds all the refreshed addresses that are ready to be inserted into the connection_map + pub(crate) refresh_addresses_done: DashSet, } impl Drop for ConnectionsContainer { @@ -155,6 +171,9 @@ impl Default for ConnectionsContainer { slot_map: Default::default(), read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary, topology_hash: 0, + refresh_addresses_started: DashSet::new(), + refresh_operations: DashMap::new(), + refresh_addresses_done: DashSet::new(), } } } @@ -182,6 +201,9 @@ where slot_map, read_from_replica_strategy, topology_hash, + refresh_addresses_started: DashSet::new(), + refresh_operations: DashMap::new(), + refresh_addresses_done: DashSet::new(), } } @@ -572,6 +594,9 @@ mod tests { connection_map, read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()), topology_hash: 0, + refresh_addresses_started: DashSet::new(), + refresh_operations: DashMap::new(), + refresh_addresses_done: DashSet::new(), } } @@ -628,6 +653,9 @@ mod tests { connection_map, read_from_replica_strategy: strategy, topology_hash: 0, + refresh_addresses_started: DashSet::new(), + refresh_operations: DashMap::new(), + refresh_addresses_done: DashSet::new(), } } diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 534fdd429e..679098ef02 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -40,18 +40,13 @@ use crate::{ commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, FromRedisValue, InfoDict, }; +use connections_container::RefreshState; use dashmap::DashMap; use std::{ - collections::{HashMap, HashSet}, - fmt, io, mem, - net::{IpAddr, SocketAddr}, - pin::Pin, - sync::{ + collections::{HashMap, HashSet}, fmt, io, iter::once, mem, net::{IpAddr, SocketAddr}, pin::Pin, sync::{ atomic::{self, AtomicUsize, Ordering}, Arc, Mutex, - }, - task::{self, Poll}, - time::SystemTime, + }, task::{self, Poll}, time::SystemTime }; use strum_macros::Display; #[cfg(feature = "tokio-comp")] @@ -1341,13 +1336,13 @@ where } // identify nodes with closed connection - let mut addrs_to_refresh = Vec::new(); + let mut addrs_to_refresh = HashSet::new(); for (addr, con_fut) in &all_valid_conns { let con = con_fut.clone().await; // connection object might be present despite the transport being closed if con.is_closed() { // transport is closed, need to refresh - addrs_to_refresh.push(addr.clone()); + addrs_to_refresh.insert(addr.clone()); } } @@ -1365,68 +1360,95 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, - false, - ) - .await; + ).await; } } async fn refresh_connections( inner: Arc>, - addresses: Vec, + addresses: HashSet, conn_type: RefreshConnectionType, - check_existing_conn: bool, ) { info!("Started refreshing connections to {:?}", addresses); - let mut tasks = FuturesUnordered::new(); - let inner = inner.clone(); - for address in addresses.into_iter() { - let inner = inner.clone(); + let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR); + let refresh_ops_map = &connections_container.refresh_operations; - tasks.push(async move { - let node_option = if check_existing_conn { - let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR); - connections_container.remove_node(&address) - } else { - None - }; + for address in addresses { + if refresh_ops_map.contains_key(&address) { + info!("Skipping refresh for {}: already in progress", address); + continue; + } + + let inner_clone = inner.clone(); + let address_clone = address.clone(); + let address_clone_for_task = address.clone(); + + let handle = tokio::spawn(async move { + info!("Refreshing connection task to {:?} started", address_clone_for_task); + let _ = async { + // Add this address to be removed in poll_flush so all requests see a consistent connection map. + // See next comment for elaborated explanation. + inner_clone.conn_lock.read().expect(MUTEX_READ_ERR).refresh_addresses_done.insert(address_clone_for_task.clone()); + + let mut cluster_params = inner_clone.cluster_params.read().expect(MUTEX_READ_ERR).clone(); + let subs_guard = inner_clone.subscriptions_by_address.read().await; + cluster_params.pubsub_subscriptions = subs_guard.get(&address_clone_for_task).cloned(); + drop(subs_guard); - // Override subscriptions for this connection - let mut cluster_params = inner.cluster_params.read().expect(MUTEX_READ_ERR).clone(); - let subs_guard = inner.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned(); - drop(subs_guard); - - let node = get_or_create_conn( - &address, - node_option, - &cluster_params, - conn_type, - inner.glide_connection_options.clone(), - ) - .await; + let node_result = get_or_create_conn( + &address_clone_for_task, + None, + &cluster_params, + conn_type, + inner_clone.glide_connection_options.clone(), + ) + .await; - (address, node) + match node_result { + Ok(node) => { + // Maintain the newly refreshed connection separately from the main connection map. + // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. + // This approach ensures that all requests within the current batch interact with a consistent connection map, + // preventing potential reordering issues. + // + // By delaying the integration of the refreshed connection: + // + // 1. We maintain consistency throughout the processing of a batch of requests. + // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. + // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. + // + // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is + // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information + // with the requirement for consistent request handling within each processing cycle. + let connection_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + if let Some(mut refresh_state) = connection_container.refresh_operations.get_mut(&address_clone_for_task) { + refresh_state.node_conn = Some(node); + } + connection_container.refresh_addresses_done.insert(address_clone_for_task); + Ok(()) + } + Err(err) => { + warn!( + "Failed to refresh connection for node {}. Error: `{:?}`", + address_clone_for_task, err + ); + Err(err) + } + } + }.await; + + info!("Refreshing connection task to {:?} is done", address_clone); }); - } - // Poll connection tasks as soon as each one finishes - while let Some(result) = tasks.next().await { - match result { - (address, Ok(node)) => { - let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR); - connections_container.replace_or_add_connection_for_address(address, node); - } - (address, Err(err)) => { - warn!( - "Failed to refresh connection for node {}. Error: `{:?}`", - address, err - ); - } - } + // Keep the task handle into the RefreshState of this address + info!("Inserting tokio task to refresh_ops map of address {:?}", address.clone()); + refresh_ops_map.insert(address, RefreshState { + handle, + node_conn: None, + }); } - debug!("refresh connections completed"); + debug!("refresh connection tasts initiated"); } async fn aggregate_results( @@ -1762,11 +1784,9 @@ where // immediately trigger connection reestablishment Self::refresh_connections( inner.clone(), - addrs_to_refresh.into_iter().collect(), + addrs_to_refresh, RefreshConnectionType::AllConnections, - false, - ) - .await; + ).await; } } @@ -1798,9 +1818,8 @@ where if !failed_connections.is_empty() { Self::refresh_connections( inner, - failed_connections, + failed_connections.into_iter().collect::>(), RefreshConnectionType::OnlyManagementConnection, - true, ) .await; } @@ -2271,32 +2290,12 @@ where let (address, mut conn) = match conn_check { ConnectionCheck::Found((address, connection)) => (address, connection.await), ConnectionCheck::OnlyAddress(addr) => { - let mut this_conn_params = core.get_cluster_param(|params| params.clone())?; - let subs_guard = core.subscriptions_by_address.read().await; - this_conn_params.pubsub_subscriptions = subs_guard.get(addr.as_str()).cloned(); - drop(subs_guard); - match connect_and_check::( - &addr, - this_conn_params, - None, - RefreshConnectionType::AllConnections, - None, - core.glide_connection_options.clone(), - ) - .await - .get_node() - { - Ok(node) => { - let connection_clone = node.user_connection.conn.clone().await; - let connections = core.conn_lock.read().expect(MUTEX_READ_ERR); - let address = connections.replace_or_add_connection_for_address(addr, node); - drop(connections); - (address, connection_clone) - } - Err(err) => { - return Err(err); - } - } + // No connection in for this address in the conn_map + Self::refresh_connections(core, HashSet::from_iter(once(addr)),RefreshConnectionType::AllConnections).await; + return Err(RedisError::from(( + ErrorKind::AllConnectionsUnavailable, + "No connection for the address, started a refresh task", + ))); } ConnectionCheck::RandomConnection => { let random_conn = core @@ -2393,6 +2392,47 @@ where Self::try_request(info, core).await } + fn update_refreshed_connection(&mut self) { + loop { + let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); + + // Process refresh_addresses_started + let addresses_to_remove: Vec = connections_container.refresh_addresses_started.iter().map(|r| r.key().clone()).collect(); + for address in addresses_to_remove { + connections_container.refresh_addresses_started.remove(&address); + connections_container.remove_node(&address); + } + + // Process refresh_addresses_done + let addresses_done: Vec = connections_container.refresh_addresses_done.iter().map(|r| r.key().clone()).collect(); + for address in addresses_done { + connections_container.refresh_addresses_done.remove(&address); + + if let Some(mut refresh_state) = connections_container.refresh_operations.get_mut(&address) { + info!("update_refreshed_connection: Update conn for addr: {}", address); + + // Take the node_conn out of RefreshState, replacing it with None + if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) { + info!("update_refreshed_connection: replacing/adding the conn"); + // Move the node_conn to the function + connections_container.replace_or_add_connection_for_address(address.clone(), node_conn); + } + } + // Remove this entry from refresh_ops_map + connections_container.refresh_operations.remove(&address); + + } + + // Check if both sets are empty + if connections_container.refresh_addresses_started.is_empty() && connections_container.refresh_addresses_done.is_empty() { + break; + } + + // Release the lock before the next iteration + drop(connections_container); + } + } + fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll { let retry_params = self .inner @@ -2498,7 +2538,7 @@ where } Next::Reconnect { request, target } => { poll_flush_action = - poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target])); + poll_flush_action.change_state(PollFlushAction::Reconnect(HashSet::from_iter([target]))); if let Some(request) = request { self.inner.pending_requests.lock().unwrap().push(request); } @@ -2543,7 +2583,7 @@ where enum PollFlushAction { None, RebuildSlots, - Reconnect(Vec), + Reconnect(HashSet), ReconnectFromInitialConnections, } @@ -2581,18 +2621,18 @@ where fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> { let Message { cmd, sender } = msg; - + let info = RequestInfo { cmd }; - + self.inner .pending_requests .lock() .unwrap() .push(PendingRequest { - retry: 0, - sender, - info, - }); + retry: 0, + sender, + info, + }); Ok(()) } @@ -2617,6 +2657,12 @@ where return Poll::Pending; } + // Updating the connection_map with all the refreshed_connections + // In case of active poll_recovery, the work should + // take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of + // non-relevant addresses. + self.update_refreshed_connection(); + match ready!(self.poll_complete(cx)) { PollFlushAction::None => return Poll::Ready(Ok(())), PollFlushAction::RebuildSlots => { @@ -2632,8 +2678,7 @@ where ClusterConnInner::refresh_connections( self.inner.clone(), addresses, - RefreshConnectionType::OnlyUserConnection, - true, + RefreshConnectionType::AllConnections, ), ))); } From f850d992c860f405664359611af5415d4c6fcd3f Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Sun, 5 Jan 2025 10:03:37 +0000 Subject: [PATCH 02/17] formating using lint Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 4 +- .../redis-rs/redis/src/cluster_async/mod.rs | 150 ++++++++++++------ 2 files changed, 106 insertions(+), 48 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 7de66f59cf..aa96472da7 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -125,10 +125,9 @@ pub(crate) struct ConnectionsMap(pub(crate) DashMap { pub handle: JoinHandle<()>, // The currect running refresh task - pub node_conn: Option> // The refreshed connection after the task is done + pub node_conn: Option>, // The refreshed connection after the task is done } - impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for item in self.0.iter() { @@ -148,7 +147,6 @@ pub(crate) struct ConnectionsContainer { read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, - // Holds all the failed addresses that started a refresh task. pub(crate) refresh_addresses_started: DashSet, // Follow the refresh ops on the connections diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 679098ef02..b2a00026d7 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -43,10 +43,18 @@ use crate::{ use connections_container::RefreshState; use dashmap::DashMap; use std::{ - collections::{HashMap, HashSet}, fmt, io, iter::once, mem, net::{IpAddr, SocketAddr}, pin::Pin, sync::{ + collections::{HashMap, HashSet}, + fmt, io, + iter::once, + mem, + net::{IpAddr, SocketAddr}, + pin::Pin, + sync::{ atomic::{self, AtomicUsize, Ordering}, Arc, Mutex, - }, task::{self, Poll}, time::SystemTime + }, + task::{self, Poll}, + time::SystemTime, }; use strum_macros::Display; #[cfg(feature = "tokio-comp")] @@ -1360,7 +1368,8 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, - ).await; + ) + .await; } } @@ -1379,21 +1388,34 @@ where info!("Skipping refresh for {}: already in progress", address); continue; } - + let inner_clone = inner.clone(); let address_clone = address.clone(); let address_clone_for_task = address.clone(); let handle = tokio::spawn(async move { - info!("Refreshing connection task to {:?} started", address_clone_for_task); + info!( + "Refreshing connection task to {:?} started", + address_clone_for_task + ); let _ = async { // Add this address to be removed in poll_flush so all requests see a consistent connection map. - // See next comment for elaborated explanation. - inner_clone.conn_lock.read().expect(MUTEX_READ_ERR).refresh_addresses_done.insert(address_clone_for_task.clone()); - - let mut cluster_params = inner_clone.cluster_params.read().expect(MUTEX_READ_ERR).clone(); + // See next comment for elaborated explanation. + inner_clone + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_addresses_done + .insert(address_clone_for_task.clone()); + + let mut cluster_params = inner_clone + .cluster_params + .read() + .expect(MUTEX_READ_ERR) + .clone(); let subs_guard = inner_clone.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = subs_guard.get(&address_clone_for_task).cloned(); + cluster_params.pubsub_subscriptions = + subs_guard.get(&address_clone_for_task).cloned(); drop(subs_guard); let node_result = get_or_create_conn( @@ -1409,7 +1431,7 @@ where Ok(node) => { // Maintain the newly refreshed connection separately from the main connection map. // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. - // This approach ensures that all requests within the current batch interact with a consistent connection map, + // This approach ensures that all requests within the current batch interact with a consistent connection map, // preventing potential reordering issues. // // By delaying the integration of the refreshed connection: @@ -1421,11 +1443,17 @@ where // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information // with the requirement for consistent request handling within each processing cycle. - let connection_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); - if let Some(mut refresh_state) = connection_container.refresh_operations.get_mut(&address_clone_for_task) { + let connection_container = + inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + if let Some(mut refresh_state) = connection_container + .refresh_operations + .get_mut(&address_clone_for_task) + { refresh_state.node_conn = Some(node); } - connection_container.refresh_addresses_done.insert(address_clone_for_task); + connection_container + .refresh_addresses_done + .insert(address_clone_for_task); Ok(()) } Err(err) => { @@ -1436,17 +1464,24 @@ where Err(err) } } - }.await; + } + .await; info!("Refreshing connection task to {:?} is done", address_clone); }); // Keep the task handle into the RefreshState of this address - info!("Inserting tokio task to refresh_ops map of address {:?}", address.clone()); - refresh_ops_map.insert(address, RefreshState { - handle, + info!( + "Inserting tokio task to refresh_ops map of address {:?}", + address.clone() + ); + refresh_ops_map.insert( + address, + RefreshState { + handle, node_conn: None, - }); + }, + ); } debug!("refresh connection tasts initiated"); } @@ -1786,7 +1821,8 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, - ).await; + ) + .await; } } @@ -2291,7 +2327,12 @@ where ConnectionCheck::Found((address, connection)) => (address, connection.await), ConnectionCheck::OnlyAddress(addr) => { // No connection in for this address in the conn_map - Self::refresh_connections(core, HashSet::from_iter(once(addr)),RefreshConnectionType::AllConnections).await; + Self::refresh_connections( + core, + HashSet::from_iter(once(addr)), + RefreshConnectionType::AllConnections, + ) + .await; return Err(RedisError::from(( ErrorKind::AllConnectionsUnavailable, "No connection for the address, started a refresh task", @@ -2395,39 +2436,58 @@ where fn update_refreshed_connection(&mut self) { loop { let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); - + // Process refresh_addresses_started - let addresses_to_remove: Vec = connections_container.refresh_addresses_started.iter().map(|r| r.key().clone()).collect(); + let addresses_to_remove: Vec = connections_container + .refresh_addresses_started + .iter() + .map(|r| r.key().clone()) + .collect(); for address in addresses_to_remove { - connections_container.refresh_addresses_started.remove(&address); + connections_container + .refresh_addresses_started + .remove(&address); connections_container.remove_node(&address); } - + // Process refresh_addresses_done - let addresses_done: Vec = connections_container.refresh_addresses_done.iter().map(|r| r.key().clone()).collect(); + let addresses_done: Vec = connections_container + .refresh_addresses_done + .iter() + .map(|r| r.key().clone()) + .collect(); for address in addresses_done { - connections_container.refresh_addresses_done.remove(&address); - - if let Some(mut refresh_state) = connections_container.refresh_operations.get_mut(&address) { - info!("update_refreshed_connection: Update conn for addr: {}", address); - + connections_container + .refresh_addresses_done + .remove(&address); + + if let Some(mut refresh_state) = + connections_container.refresh_operations.get_mut(&address) + { + info!( + "update_refreshed_connection: Update conn for addr: {}", + address + ); + // Take the node_conn out of RefreshState, replacing it with None if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) { info!("update_refreshed_connection: replacing/adding the conn"); // Move the node_conn to the function - connections_container.replace_or_add_connection_for_address(address.clone(), node_conn); + connections_container + .replace_or_add_connection_for_address(address.clone(), node_conn); } } // Remove this entry from refresh_ops_map connections_container.refresh_operations.remove(&address); - } - + // Check if both sets are empty - if connections_container.refresh_addresses_started.is_empty() && connections_container.refresh_addresses_done.is_empty() { + if connections_container.refresh_addresses_started.is_empty() + && connections_container.refresh_addresses_done.is_empty() + { break; } - + // Release the lock before the next iteration drop(connections_container); } @@ -2537,8 +2597,8 @@ where } } Next::Reconnect { request, target } => { - poll_flush_action = - poll_flush_action.change_state(PollFlushAction::Reconnect(HashSet::from_iter([target]))); + poll_flush_action = poll_flush_action + .change_state(PollFlushAction::Reconnect(HashSet::from_iter([target]))); if let Some(request) = request { self.inner.pending_requests.lock().unwrap().push(request); } @@ -2621,18 +2681,18 @@ where fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> { let Message { cmd, sender } = msg; - + let info = RequestInfo { cmd }; - + self.inner .pending_requests .lock() .unwrap() .push(PendingRequest { - retry: 0, - sender, - info, - }); + retry: 0, + sender, + info, + }); Ok(()) } @@ -2659,7 +2719,7 @@ where // Updating the connection_map with all the refreshed_connections // In case of active poll_recovery, the work should - // take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of + // take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of // non-relevant addresses. self.update_refreshed_connection(); From 30b35f293a06ef1361bb841dd25a089af06c6d37 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Mon, 6 Jan 2025 11:13:47 +0000 Subject: [PATCH 03/17] Fixing bug - Refresh addresses started to take place Signed-off-by: GilboaAWS --- .../redis-rs/redis/src/cluster_async/mod.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index b2a00026d7..7bda70fbb4 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1405,7 +1405,7 @@ where .conn_lock .read() .expect(MUTEX_READ_ERR) - .refresh_addresses_done + .refresh_addresses_started .insert(address_clone_for_task.clone()); let mut cluster_params = inner_clone @@ -1443,6 +1443,10 @@ where // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information // with the requirement for consistent request handling within each processing cycle. + debug!( + "Succeeded to refresh connection for node {}.", + address_clone_for_task + ); let connection_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); if let Some(mut refresh_state) = connection_container @@ -2434,6 +2438,7 @@ where } fn update_refreshed_connection(&mut self) { + trace!("update_refreshed_connection started"); loop { let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); @@ -2464,17 +2469,17 @@ where if let Some(mut refresh_state) = connections_container.refresh_operations.get_mut(&address) { - info!( - "update_refreshed_connection: Update conn for addr: {}", - address - ); - // Take the node_conn out of RefreshState, replacing it with None if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) { - info!("update_refreshed_connection: replacing/adding the conn"); + debug!( + "update_refreshed_connection: replacing/adding the connection: {}", + address + ); // Move the node_conn to the function connections_container .replace_or_add_connection_for_address(address.clone(), node_conn); + } else { + debug!("update_refreshed_connection: reconnection failed, no connection for address: {}", address); } } // Remove this entry from refresh_ops_map From b2caf01970c9f3ff57275332a6e35c74b6218bd1 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Mon, 6 Jan 2025 16:21:30 +0000 Subject: [PATCH 04/17] Bug fix Signed-off-by: GilboaAWS --- .../redis-rs/redis/src/cluster_async/mod.rs | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 7bda70fbb4..38228821a7 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1454,20 +1454,21 @@ where .get_mut(&address_clone_for_task) { refresh_state.node_conn = Some(node); - } - connection_container - .refresh_addresses_done - .insert(address_clone_for_task); - Ok(()) + }; } Err(err) => { warn!( "Failed to refresh connection for node {}. Error: `{:?}`", address_clone_for_task, err ); - Err(err) } } + inner_clone + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_addresses_done + .insert(address_clone_for_task); } .await; @@ -1487,7 +1488,7 @@ where }, ); } - debug!("refresh connection tasts initiated"); + debug!("refresh connection tasks initiated"); } async fn aggregate_results( @@ -2338,7 +2339,7 @@ where ) .await; return Err(RedisError::from(( - ErrorKind::AllConnectionsUnavailable, + ErrorKind::ConnectionNotFoundForRoute, "No connection for the address, started a refresh task", ))); } @@ -2442,6 +2443,13 @@ where loop { let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); + // Check if both sets are empty + if connections_container.refresh_addresses_started.is_empty() + && connections_container.refresh_addresses_done.is_empty() + { + break; + } + // Process refresh_addresses_started let addresses_to_remove: Vec = connections_container .refresh_addresses_started @@ -2483,14 +2491,10 @@ where } } // Remove this entry from refresh_ops_map - connections_container.refresh_operations.remove(&address); - } - - // Check if both sets are empty - if connections_container.refresh_addresses_started.is_empty() - && connections_container.refresh_addresses_done.is_empty() - { - break; + match connections_container.refresh_operations.remove(&address) { + Some(_) => (), + None => warn!("update_refreshed_connection: No refresh operation found to remove for address: {:?}", address), + } } // Release the lock before the next iteration From 4e6535f7e89ca50a75fae24fae689c361fe7843d Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 7 Jan 2025 09:48:00 +0000 Subject: [PATCH 05/17] Fix update_passwd test in python Signed-off-by: GilboaAWS --- python/python/tests/conftest.py | 32 ++++++++++++++++++++++++++++++++ python/python/tests/test_auth.py | 1 + 2 files changed, 33 insertions(+) diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index b2a97b4d0a..9ad3ee6355 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -224,6 +224,22 @@ async def glide_client( await client.close() +@pytest.fixture(scope="function") +async def glide_client( + request, + cluster_mode: bool, + protocol: ProtocolVersion, + request_timeout: int, +) -> AsyncGenerator[TGlideClient, None]: + "Get async socket client for tests" + client = await create_client( + request, cluster_mode, protocol=protocol, request_timeout=request_timeout + ) + yield client + await test_teardown(request, cluster_mode, protocol) + await client.close() + + @pytest.fixture(scope="function") async def management_client( request, @@ -237,6 +253,22 @@ async def management_client( await client.close() +@pytest.fixture(scope="function") +async def management_client( + request, + cluster_mode: bool, + protocol: ProtocolVersion, + request_timeout: int, +) -> AsyncGenerator[TGlideClient, None]: + "Get async socket client for tests, used to manage the state when tests are on the client ability to connect" + client = await create_client( + request, cluster_mode, protocol=protocol, request_timeout=request_timeout + ) + yield client + await test_teardown(request, cluster_mode, protocol) + await client.close() + + async def create_client( request, cluster_mode: bool, diff --git a/python/python/tests/test_auth.py b/python/python/tests/test_auth.py index 7e3fc67851..612c04786f 100644 --- a/python/python/tests/test_auth.py +++ b/python/python/tests/test_auth.py @@ -36,6 +36,7 @@ async def cleanup(self, request, management_client: TGlideClient): @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + @pytest.mark.parametrize("request_timeout", [3000]) async def test_update_connection_password( self, glide_client: TGlideClient, management_client: TGlideClient ): From 2d93b4ab965b02a6a6b469d55c3501bd6562eb79 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 7 Jan 2025 09:57:08 +0000 Subject: [PATCH 06/17] Clear refresh data on reconnect to initial nodes Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 27 +++++++++++++++++++ .../redis-rs/redis/src/cluster_async/mod.rs | 7 +++++ 2 files changed, 34 insertions(+) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index aa96472da7..a18ccfc35b 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -10,6 +10,8 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use telemetrylib::Telemetry; +use tracing::debug; + use tokio::task::JoinHandle; /// Count the number of connections in a connections_map object @@ -215,6 +217,31 @@ where .map(|item| (item.key().clone(), item.value().clone())) } + pub(crate) fn clear_refresh_state(&mut self) { + let addresses: Vec = self + .refresh_operations + .iter() + .map(|entry| entry.key().clone()) + .collect(); + + debug!( + "clear_refresh_state: removing all refresh data and tasks for addresses: {:?}", + addresses + ); + + for address in addresses { + if let Some((_, refresh_state)) = self.refresh_operations.remove(&address) { + // Check if handle exists before calling abort + if !refresh_state.handle.is_finished() { + refresh_state.handle.abort(); + } + } + } + + self.refresh_addresses_started.clear(); + self.refresh_addresses_done.clear(); + } + // Extends the current connection map with the provided one pub(crate) fn extend_connection_map( &mut self, diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 38228821a7..630aca3b53 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1284,6 +1284,13 @@ where } }; Box::pin(async move { + // Remove all refresh_connection data, to have a new clear state + inner + .conn_lock + .write() + .expect(MUTEX_WRITE_ERR) + .clear_refresh_state(); + let connection_map = match Self::create_initial_connections( &inner.initial_nodes, &cluster_params, From d7ff41ed900e8b12e1a06e0776169752c8c6e6ef Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 7 Jan 2025 10:47:41 +0000 Subject: [PATCH 07/17] Revert "Fix update_passwd test in python" This reverts commit 4e6535f7e89ca50a75fae24fae689c361fe7843d. Signed-off-by: GilboaAWS --- python/python/tests/conftest.py | 32 -------------------------------- python/python/tests/test_auth.py | 1 - 2 files changed, 33 deletions(-) diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index 9ad3ee6355..b2a97b4d0a 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -224,22 +224,6 @@ async def glide_client( await client.close() -@pytest.fixture(scope="function") -async def glide_client( - request, - cluster_mode: bool, - protocol: ProtocolVersion, - request_timeout: int, -) -> AsyncGenerator[TGlideClient, None]: - "Get async socket client for tests" - client = await create_client( - request, cluster_mode, protocol=protocol, request_timeout=request_timeout - ) - yield client - await test_teardown(request, cluster_mode, protocol) - await client.close() - - @pytest.fixture(scope="function") async def management_client( request, @@ -253,22 +237,6 @@ async def management_client( await client.close() -@pytest.fixture(scope="function") -async def management_client( - request, - cluster_mode: bool, - protocol: ProtocolVersion, - request_timeout: int, -) -> AsyncGenerator[TGlideClient, None]: - "Get async socket client for tests, used to manage the state when tests are on the client ability to connect" - client = await create_client( - request, cluster_mode, protocol=protocol, request_timeout=request_timeout - ) - yield client - await test_teardown(request, cluster_mode, protocol) - await client.close() - - async def create_client( request, cluster_mode: bool, diff --git a/python/python/tests/test_auth.py b/python/python/tests/test_auth.py index 612c04786f..7e3fc67851 100644 --- a/python/python/tests/test_auth.py +++ b/python/python/tests/test_auth.py @@ -36,7 +36,6 @@ async def cleanup(self, request, management_client: TGlideClient): @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - @pytest.mark.parametrize("request_timeout", [3000]) async def test_update_connection_password( self, glide_client: TGlideClient, management_client: TGlideClient ): From bf867ccf521d63d23e414e0d5b581e3c40747a43 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 7 Jan 2025 11:01:08 +0000 Subject: [PATCH 08/17] refresh slot after reconnect to initials as a update_password test fails because all connection are dropped and the refresh task take longer than the request timeout. Signed-off-by: GilboaAWS --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 630aca3b53..c688fdd0fb 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1311,7 +1311,7 @@ where .extend_connection_map(connection_map); if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries( inner.clone(), - &RefreshPolicy::Throttable, + &RefreshPolicy::NotThrottable, ) .await { From a12837a9e0aa79304036f23a8768f3be9395f2db Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Wed, 8 Jan 2025 10:53:01 +0000 Subject: [PATCH 09/17] 1. Replaced DashMap to HashMap 2. Moved refresh data to a struct 3. Returned the refresh_connection logic of sending the connection but without removing it from the connection_map Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 104 ++++---- .../redis-rs/redis/src/cluster_async/mod.rs | 239 ++++++++++++------ 2 files changed, 209 insertions(+), 134 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index a18ccfc35b..bd5ed5b509 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -2,9 +2,10 @@ use crate::cluster_async::ConnectionFuture; use crate::cluster_routing::{Route, ShardAddrs, SlotAddr}; use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; use crate::cluster_topology::TopologyHash; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; +use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -125,11 +126,6 @@ pub(crate) enum ConnectionType { pub(crate) struct ConnectionsMap(pub(crate) DashMap>); -pub(crate) struct RefreshState { - pub handle: JoinHandle<()>, // The currect running refresh task - pub node_conn: Option>, // The refreshed connection after the task is done -} - impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for item in self.0.iter() { @@ -143,18 +139,59 @@ impl std::fmt::Display for ConnectionsMap { } } +// This struct is used to track the status of each address refresh +pub(crate) struct RefreshConnectionStates { + // Holds all the failed addresses that started a refresh task. + pub(crate) refresh_addresses_started: HashSet, + // Follow the refresh ops on the connections + pub(crate) refresh_address_in_progress: HashMap>, + // Holds all the refreshed addresses that are ready to be inserted into the connection_map + pub(crate) refresh_addresses_done: HashMap>>, +} + +impl RefreshConnectionStates { + pub(crate) fn clear_refresh_state(&mut self) { + let addresses: Vec = self + .refresh_address_in_progress + .iter() + .map(|entry| entry.0.clone()) + .collect(); + + debug!( + "clear_refresh_state: removing all refresh data and tasks for addresses: {:?}", + addresses + ); + + for address in addresses { + if let Some(refresh_task) = self.refresh_address_in_progress.remove(&address) { + // Check if handle exists before calling abort + if !refresh_task.is_finished() { + refresh_task.abort(); + } + } + } + + self.refresh_addresses_started.clear(); + self.refresh_addresses_done.clear(); + } +} + +impl Default for RefreshConnectionStates { + fn default() -> Self { + Self { + refresh_addresses_started: HashSet::new(), + refresh_address_in_progress: HashMap::new(), + refresh_addresses_done: HashMap::new(), + } + } +} + pub(crate) struct ConnectionsContainer { connection_map: DashMap>, pub(crate) slot_map: SlotMap, read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, - - // Holds all the failed addresses that started a refresh task. - pub(crate) refresh_addresses_started: DashSet, - // Follow the refresh ops on the connections - pub(crate) refresh_operations: DashMap>, - // Holds all the refreshed addresses that are ready to be inserted into the connection_map - pub(crate) refresh_addresses_done: DashSet, + pub(crate) refresh_conn_state: RefreshConnectionStates, } impl Drop for ConnectionsContainer { @@ -171,9 +208,7 @@ impl Default for ConnectionsContainer { slot_map: Default::default(), read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary, topology_hash: 0, - refresh_addresses_started: DashSet::new(), - refresh_operations: DashMap::new(), - refresh_addresses_done: DashSet::new(), + refresh_conn_state: Default::default(), } } } @@ -201,9 +236,7 @@ where slot_map, read_from_replica_strategy, topology_hash, - refresh_addresses_started: DashSet::new(), - refresh_operations: DashMap::new(), - refresh_addresses_done: DashSet::new(), + refresh_conn_state: Default::default(), } } @@ -217,31 +250,6 @@ where .map(|item| (item.key().clone(), item.value().clone())) } - pub(crate) fn clear_refresh_state(&mut self) { - let addresses: Vec = self - .refresh_operations - .iter() - .map(|entry| entry.key().clone()) - .collect(); - - debug!( - "clear_refresh_state: removing all refresh data and tasks for addresses: {:?}", - addresses - ); - - for address in addresses { - if let Some((_, refresh_state)) = self.refresh_operations.remove(&address) { - // Check if handle exists before calling abort - if !refresh_state.handle.is_finished() { - refresh_state.handle.abort(); - } - } - } - - self.refresh_addresses_started.clear(); - self.refresh_addresses_done.clear(); - } - // Extends the current connection map with the provided one pub(crate) fn extend_connection_map( &mut self, @@ -619,9 +627,7 @@ mod tests { connection_map, read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()), topology_hash: 0, - refresh_addresses_started: DashSet::new(), - refresh_operations: DashMap::new(), - refresh_addresses_done: DashSet::new(), + refresh_conn_state: Default::default(), } } @@ -678,9 +684,7 @@ mod tests { connection_map, read_from_replica_strategy: strategy, topology_hash: 0, - refresh_addresses_started: DashSet::new(), - refresh_operations: DashMap::new(), - refresh_addresses_done: DashSet::new(), + refresh_conn_state: Default::default(), } } diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index c688fdd0fb..10d461d676 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -40,7 +40,6 @@ use crate::{ commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, FromRedisValue, InfoDict, }; -use connections_container::RefreshState; use dashmap::DashMap; use std::{ collections::{HashMap, HashSet}, @@ -1284,13 +1283,6 @@ where } }; Box::pin(async move { - // Remove all refresh_connection data, to have a new clear state - inner - .conn_lock - .write() - .expect(MUTEX_WRITE_ERR) - .clear_refresh_state(); - let connection_map = match Self::create_initial_connections( &inner.initial_nodes, &cluster_params, @@ -1375,6 +1367,7 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, + false, ) .await; } @@ -1384,14 +1377,19 @@ where inner: Arc>, addresses: HashSet, conn_type: RefreshConnectionType, + check_existing_conn: bool, ) { info!("Started refreshing connections to {:?}", addresses); - let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR); - let refresh_ops_map = &connections_container.refresh_operations; - for address in addresses { - if refresh_ops_map.contains_key(&address) { + if inner + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_address_in_progress + .contains_key(&address) + { info!("Skipping refresh for {}: already in progress", address); continue; } @@ -1406,12 +1404,24 @@ where address_clone_for_task ); let _ = async { + let node_option = if check_existing_conn { + let connections_container = + inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + connections_container + .connection_map() + .get(&address_clone_for_task) + .map(|node| node.value().clone()) + } else { + None + }; + // Add this address to be removed in poll_flush so all requests see a consistent connection map. // See next comment for elaborated explanation. inner_clone .conn_lock - .read() + .write() .expect(MUTEX_READ_ERR) + .refresh_conn_state .refresh_addresses_started .insert(address_clone_for_task.clone()); @@ -1427,55 +1437,55 @@ where let node_result = get_or_create_conn( &address_clone_for_task, - None, + node_option, &cluster_params, conn_type, inner_clone.glide_connection_options.clone(), ) .await; + // Maintain the newly refreshed connection separately from the main connection map. + // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. + // This approach ensures that all requests within the current batch interact with a consistent connection map, + // preventing potential reordering issues. + // + // By delaying the integration of the refreshed connection: + // + // 1. We maintain consistency throughout the processing of a batch of requests. + // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. + // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. + // + // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is + // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information + // with the requirement for consistent request handling within each processing cycle. match node_result { Ok(node) => { - // Maintain the newly refreshed connection separately from the main connection map. - // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. - // This approach ensures that all requests within the current batch interact with a consistent connection map, - // preventing potential reordering issues. - // - // By delaying the integration of the refreshed connection: - // - // 1. We maintain consistency throughout the processing of a batch of requests. - // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. - // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. - // - // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is - // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information - // with the requirement for consistent request handling within each processing cycle. debug!( "Succeeded to refresh connection for node {}.", address_clone_for_task ); - let connection_container = - inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); - if let Some(mut refresh_state) = connection_container - .refresh_operations - .get_mut(&address_clone_for_task) - { - refresh_state.node_conn = Some(node); - }; + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_done + .insert(address_clone_for_task, Some(node)); } Err(err) => { warn!( "Failed to refresh connection for node {}. Error: `{:?}`", address_clone_for_task, err ); + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_done + .insert(address_clone_for_task, None); } } - inner_clone - .conn_lock - .read() - .expect(MUTEX_READ_ERR) - .refresh_addresses_done - .insert(address_clone_for_task); } .await; @@ -1487,12 +1497,16 @@ where "Inserting tokio task to refresh_ops map of address {:?}", address.clone() ); - refresh_ops_map.insert( - address, - RefreshState { - handle, - node_conn: None, - }, + inner + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_address_in_progress + .insert(address.clone(), handle); + info!( + "Inserted tokio task to refresh_ops map of address {:?}", + address.clone() ); } debug!("refresh connection tasks initiated"); @@ -1833,6 +1847,7 @@ where inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, + false, ) .await; } @@ -1868,6 +1883,7 @@ where inner, failed_connections.into_iter().collect::>(), RefreshConnectionType::OnlyManagementConnection, + true, ) .await; } @@ -1966,6 +1982,9 @@ where info!("refresh_slots found nodes:\n{new_connections}"); // Reset the current slot map and connection vector with the new ones let mut write_guard = inner.conn_lock.write().expect(MUTEX_WRITE_ERR); + // Clear the refresh tasks of the prev instance + // TODO - Maybe we can take the running refresh tasks and use them instead of running new connection creation + write_guard.refresh_conn_state.clear_refresh_state(); let read_from_replicas = inner .get_cluster_param(|params| params.read_from_replicas.clone()) .expect(MUTEX_READ_ERR); @@ -2343,6 +2362,7 @@ where core, HashSet::from_iter(once(addr)), RefreshConnectionType::AllConnections, + false, ) .await; return Err(RedisError::from(( @@ -2448,64 +2468,114 @@ where fn update_refreshed_connection(&mut self) { trace!("update_refreshed_connection started"); loop { - let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); + let connections_container = self.inner.conn_lock.read().expect(MUTEX_READ_ERR); // Check if both sets are empty - if connections_container.refresh_addresses_started.is_empty() - && connections_container.refresh_addresses_done.is_empty() + if connections_container + .refresh_conn_state + .refresh_addresses_started + .is_empty() + && connections_container + .refresh_conn_state + .refresh_addresses_done + .is_empty() { break; } - // Process refresh_addresses_started let addresses_to_remove: Vec = connections_container + .refresh_conn_state .refresh_addresses_started .iter() - .map(|r| r.key().clone()) + .cloned() + .collect(); + + let addresses_done: Vec = connections_container + .refresh_conn_state + .refresh_addresses_done + .keys() + .cloned() .collect(); + + let current_existing_addresses_in_slot_map = + connections_container.slot_map.all_node_addresses(); + + drop(connections_container); + + // Process refresh_addresses_started for address in addresses_to_remove { - connections_container + self.inner + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state .refresh_addresses_started .remove(&address); - connections_container.remove_node(&address); + self.inner + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .remove_node(&address); } // Process refresh_addresses_done - let addresses_done: Vec = connections_container - .refresh_addresses_done - .iter() - .map(|r| r.key().clone()) - .collect(); for address in addresses_done { - connections_container + // Check if this address appears in the current topology + if current_existing_addresses_in_slot_map.contains(&address) { + // Check if the address exists in refresh_addresses_done + let mut conn_lock_write = self.inner.conn_lock.write().expect(MUTEX_READ_ERR); + if let Some(conn_option) = conn_lock_write + .refresh_conn_state + .refresh_addresses_done + .get_mut(&address) + { + // Match the content of the Option + match conn_option.take() { + Some(conn) => { + debug!( + "update_refreshed_connection: found refreshed connection for address {}", + address + ); + // Move the node_conn to the function + conn_lock_write + .replace_or_add_connection_for_address(address.clone(), conn); + } + None => { + debug!( + "update_refreshed_connection: task completed, but no connection for address {}", + address + ); + } + } + } + } + + // Remove this address from refresh_addresses_done + self.inner + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state .refresh_addresses_done .remove(&address); - if let Some(mut refresh_state) = - connections_container.refresh_operations.get_mut(&address) + // Remove this entry from refresh_address_in_progress + if self + .inner + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_address_in_progress + .remove(&address) + .is_none() { - // Take the node_conn out of RefreshState, replacing it with None - if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) { - debug!( - "update_refreshed_connection: replacing/adding the connection: {}", - address - ); - // Move the node_conn to the function - connections_container - .replace_or_add_connection_for_address(address.clone(), node_conn); - } else { - debug!("update_refreshed_connection: reconnection failed, no connection for address: {}", address); - } - } - // Remove this entry from refresh_ops_map - match connections_container.refresh_operations.remove(&address) { - Some(_) => (), - None => warn!("update_refreshed_connection: No refresh operation found to remove for address: {:?}", address), + warn!( + "update_refreshed_connection: No refresh operation found to remove for address: {:?}", + address + ); } } - - // Release the lock before the next iteration - drop(connections_container); } } @@ -2754,7 +2824,8 @@ where ClusterConnInner::refresh_connections( self.inner.clone(), addresses, - RefreshConnectionType::AllConnections, + RefreshConnectionType::OnlyUserConnection, + true, ), ))); } From 7b84fb0d007fd04207425318382dc2d919948378 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Sun, 12 Jan 2025 14:07:06 +0000 Subject: [PATCH 10/17] Adding address to be removed from the refresh_connection instead from the tokio refresh task Signed-off-by: GilboaAWS --- .../redis-rs/redis/src/cluster_async/mod.rs | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 10d461d676..6fe1c6da08 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1398,6 +1398,16 @@ where let address_clone = address.clone(); let address_clone_for_task = address.clone(); + // Add this address to be removed in poll_flush so all requests see a consistent connection map. + // See next comment for elaborated explanation. + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_started + .insert(address_clone_for_task.clone()); + let handle = tokio::spawn(async move { info!( "Refreshing connection task to {:?} started", @@ -1415,16 +1425,6 @@ where None }; - // Add this address to be removed in poll_flush so all requests see a consistent connection map. - // See next comment for elaborated explanation. - inner_clone - .conn_lock - .write() - .expect(MUTEX_READ_ERR) - .refresh_conn_state - .refresh_addresses_started - .insert(address_clone_for_task.clone()); - let mut cluster_params = inner_clone .cluster_params .read() @@ -2549,6 +2549,9 @@ where } } } + else { + debug!("update_refreshed_connection: address {:?} current_existing_addresses_in_slot_map: {:?}", address, current_existing_addresses_in_slot_map); + } // Remove this address from refresh_addresses_done self.inner From be41dc51b89abd01e1ffe2e723fbb068fb1d73bd Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Mon, 13 Jan 2025 08:20:38 +0000 Subject: [PATCH 11/17] lint fixes Signed-off-by: GilboaAWS --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 6fe1c6da08..f72626782f 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2548,9 +2548,8 @@ where } } } - } - else { - debug!("update_refreshed_connection: address {:?} current_existing_addresses_in_slot_map: {:?}", address, current_existing_addresses_in_slot_map); + } else { + debug!("update_refreshed_connection: address {:?} doesn't appear in addresses in slot_map: {:?}", address, current_existing_addresses_in_slot_map); } // Remove this address from refresh_addresses_done From e87e0cb0c9f7781dfce5f03929a8d0c0f4b546e6 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 14 Jan 2025 08:48:21 +0000 Subject: [PATCH 12/17] Add refresh task status to indicate if the task is running too long Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 143 +++++++++++++++--- .../redis-rs/redis/src/cluster_async/mod.rs | 105 ++++++++++--- 2 files changed, 209 insertions(+), 39 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index bd5ed5b509..5f588e6fb4 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -13,6 +13,7 @@ use telemetrylib::Telemetry; use tracing::debug; +use tokio::sync::Notify; use tokio::task::JoinHandle; /// Count the number of connections in a connections_map object @@ -139,38 +140,146 @@ impl std::fmt::Display for ConnectionsMap { } } +#[derive(Clone, Debug)] +pub(crate) struct RefreshTaskNotifier { + notify: Arc, +} + +impl RefreshTaskNotifier { + fn new() -> Self { + RefreshTaskNotifier { + notify: Arc::new(Notify::new()), + } + } + + pub fn get_notifier(&self) -> Arc { + self.notify.clone() + } + + pub fn notify(&self) { + self.notify.notify_waiters(); + } +} + +// Enum representing the task status during a connection refresh. +// +// - **Reconnecting**: +// Indicates that a refresh task is in progress. This status includes a dedicated +// notifier (`RefreshTaskNotifier`) so that other tasks can wait for the connection +// to be refreshed before proceeding. +// +// - **ReconnectingTooLong**: +// Represents a situation where a refresh task has taken too long to complete. +// The status transitions from `Reconnecting` to `ReconnectingTooLong` under specific +// conditions (e.g., after one attempt of reconnecting inside the task or after a timeout). +// +// The transition from `Reconnecting` to `ReconnectingTooLong` is managed exclusively +// within the `update_refreshed_connection` function in `poll_flush`. This ensures that +// all requests maintain a consistent view of the connections. +// +// When transitioning from `Reconnecting` to `ReconnectingTooLong`, the associated +// notifier is triggered to unblock all awaiting tasks. +#[derive(Clone, Debug)] +pub(crate) enum RefreshTaskStatus { + // The task is actively reconnecting. Includes a notifier for tasks to wait on. + Reconnecting(RefreshTaskNotifier), + // The task has exceeded the allowed reconnection time. + ReconnectingTooLong, +} + +impl Drop for RefreshTaskStatus { + fn drop(&mut self) { + if let RefreshTaskStatus::Reconnecting(notifier) = self { + debug!("RefreshTaskStatus: Dropped while in Reconnecting status. Notifying tasks."); + notifier.notify(); + } + } +} + +impl RefreshTaskStatus { + /// Creates a new `RefreshTaskStatus` in the `Reconnecting` status with a fresh `RefreshTaskNotifier`. + pub fn new() -> Self { + debug!("RefreshTaskStatus: Initialized in Reconnecting status with a new notifier."); + RefreshTaskStatus::Reconnecting(RefreshTaskNotifier::new()) + } + + // Transitions the current status from `Reconnecting` to `ReconnectingTooLong` in place. + // + // If the current status is `Reconnecting`, this method notifies all waiting tasks + // using the embedded `RefreshTaskNotifier` and updates the status to `ReconnectingTooLong`. + // + // If the status is already `ReconnectingTooLong`, this method does nothing. + pub fn flip_state(&mut self) { + if let RefreshTaskStatus::Reconnecting(notifier) = self { + debug!( + "RefreshTaskStatus: Notifying tasks before transitioning to ReconnectingTooLong." + ); + notifier.notify(); + *self = RefreshTaskStatus::ReconnectingTooLong; + } else { + debug!("RefreshTaskStatus: Already in ReconnectingTooLong status."); + } + } +} + +// Struct combining the task handle and its status +#[derive(Debug)] +pub(crate) struct RefreshTaskState { + pub handle: JoinHandle<()>, + pub status: RefreshTaskStatus, +} + +impl RefreshTaskState { + // Creates a new `RefreshTaskState` with a `Reconnecting` state and a new notifier. + pub fn new(handle: JoinHandle<()>) -> Self { + debug!("RefreshTaskState: Creating a new instance with a Reconnecting state."); + RefreshTaskState { + handle, + status: RefreshTaskStatus::new(), + } + } +} + +impl Drop for RefreshTaskState { + fn drop(&mut self) { + if let RefreshTaskStatus::Reconnecting(ref notifier) = self.status { + debug!("RefreshTaskState: Dropped while in Reconnecting status. Notifying tasks."); + notifier.notify(); + } else { + debug!("RefreshTaskState: Dropped while in ReconnectingTooLong status."); + } + + // Abort the task handle if it's not yet finished + if !self.handle.is_finished() { + debug!("RefreshTaskState: Aborting unfinished task."); + self.handle.abort(); + } else { + debug!("RefreshTaskState: Task already finished, no abort necessary."); + } + } +} + // This struct is used to track the status of each address refresh pub(crate) struct RefreshConnectionStates { // Holds all the failed addresses that started a refresh task. pub(crate) refresh_addresses_started: HashSet, // Follow the refresh ops on the connections - pub(crate) refresh_address_in_progress: HashMap>, + pub(crate) refresh_address_in_progress: HashMap, // Holds all the refreshed addresses that are ready to be inserted into the connection_map pub(crate) refresh_addresses_done: HashMap>>, } impl RefreshConnectionStates { pub(crate) fn clear_refresh_state(&mut self) { - let addresses: Vec = self - .refresh_address_in_progress - .iter() - .map(|entry| entry.0.clone()) - .collect(); - debug!( - "clear_refresh_state: removing all refresh data and tasks for addresses: {:?}", - addresses + "clear_refresh_state: removing all in-progress refresh connection tasks for addresses: {:?}", + self.refresh_address_in_progress.keys().collect::>() ); - for address in addresses { - if let Some(refresh_task) = self.refresh_address_in_progress.remove(&address) { - // Check if handle exists before calling abort - if !refresh_task.is_finished() { - refresh_task.abort(); - } - } - } + // Clear the entire map; Drop handles the cleanup + self.refresh_address_in_progress.clear(); + // Clear other state tracking self.refresh_addresses_started.clear(); self.refresh_addresses_done.clear(); } diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index f72626782f..69683fda97 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -40,6 +40,7 @@ use crate::{ commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, FromRedisValue, InfoDict, }; +use connections_container::{RefreshTaskState, RefreshTaskStatus}; use dashmap::DashMap; use std::{ collections::{HashMap, HashSet}, @@ -1366,8 +1367,8 @@ where Self::refresh_connections( inner.clone(), addrs_to_refresh, - RefreshConnectionType::AllConnections, - false, + RefreshConnectionType::OnlyUserConnection, + true, ) .await; } @@ -1477,6 +1478,8 @@ where "Failed to refresh connection for node {}. Error: `{:?}`", address_clone_for_task, err ); + // TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move + // the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection. inner_clone .conn_lock .write() @@ -1493,21 +1496,13 @@ where }); // Keep the task handle into the RefreshState of this address - info!( - "Inserting tokio task to refresh_ops map of address {:?}", - address.clone() - ); inner .conn_lock .write() .expect(MUTEX_READ_ERR) .refresh_conn_state .refresh_address_in_progress - .insert(address.clone(), handle); - info!( - "Inserted tokio task to refresh_ops map of address {:?}", - address.clone() - ); + .insert(address.clone(), RefreshTaskState::new(handle)); } debug!("refresh connection tasks initiated"); } @@ -2356,19 +2351,81 @@ where let (address, mut conn) = match conn_check { ConnectionCheck::Found((address, connection)) => (address, connection.await), - ConnectionCheck::OnlyAddress(addr) => { - // No connection in for this address in the conn_map + ConnectionCheck::OnlyAddress(address) => { + // No connection for this address in the conn_map Self::refresh_connections( - core, - HashSet::from_iter(once(addr)), + core.clone(), + HashSet::from_iter(once(address.clone())), RefreshConnectionType::AllConnections, false, ) .await; - return Err(RedisError::from(( - ErrorKind::ConnectionNotFoundForRoute, - "No connection for the address, started a refresh task", - ))); + + let reconnect_notifier: Option> = match core + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_address_in_progress + .get(&address) + { + Some(refresh_task_state) => { + match &refresh_task_state.status { + // If the task status is `Reconnecting`, grab the notifier. + RefreshTaskStatus::Reconnecting(refresh_notifier) => { + Some(refresh_notifier.get_notifier()) + } + RefreshTaskStatus::ReconnectingTooLong => { + debug!( + "get_connection: Address {} is in ReconnectingTooLong state, skipping notifier wait.", + address + ); + None + } + } + } + None => { + debug!( + "get_connection: No refresh task found in progress for address: {}", + address + ); + None + } + }; + + let mut conn_option = None; + if let Some(refresh_notifier) = reconnect_notifier { + debug!( + "get_connection: Waiting on the refresh notifier for address: {}", + address + ); + // Wait for the refresh task to notify that it's done reconnecting (or transitioning). + refresh_notifier.notified().await; + debug!( + "get_connection: After waiting on the refresh notifier for address: {}", + address + ); + + conn_option = core + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .connection_for_address(&address); + } + + if let Some((address, conn)) = conn_option { + debug!("get_connection: Connection found for address: {}", address); + // If found, return the connection + (address, conn.await) + } else { + // Otherwise, return an error indicating the connection wasn't found + return Err(( + ErrorKind::ConnectionNotFoundForRoute, + "Requested connection not found", + address, + ) + .into()); + } } ConnectionCheck::RandomConnection => { let random_conn = core @@ -2562,7 +2619,7 @@ where .remove(&address); // Remove this entry from refresh_address_in_progress - if self + if let Some(_) = self .inner .conn_lock .write() @@ -2570,10 +2627,14 @@ where .refresh_conn_state .refresh_address_in_progress .remove(&address) - .is_none() { + debug!( + "update_refreshed_connection: Successfully removed refresh state for address: {}", + address + ); + } else { warn!( - "update_refreshed_connection: No refresh operation found to remove for address: {:?}", + "update_refreshed_connection: No refresh state found to remove for address: {:?}", address ); } From 029aafc4bb7e3dd306c9008d387844cee656ccc4 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Wed, 15 Jan 2025 11:35:59 +0000 Subject: [PATCH 13/17] Updated get_connection for SpecificNode to await on refreshing connection before redirecting to a random node Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 58 ++++- .../redis-rs/redis/src/cluster_async/mod.rs | 225 +++++++++++------- 2 files changed, 201 insertions(+), 82 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 5f588e6fb4..e9aa4e1201 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -184,6 +184,7 @@ pub(crate) enum RefreshTaskStatus { // The task is actively reconnecting. Includes a notifier for tasks to wait on. Reconnecting(RefreshTaskNotifier), // The task has exceeded the allowed reconnection time. + #[allow(dead_code)] ReconnectingTooLong, } @@ -209,7 +210,8 @@ impl RefreshTaskStatus { // using the embedded `RefreshTaskNotifier` and updates the status to `ReconnectingTooLong`. // // If the status is already `ReconnectingTooLong`, this method does nothing. - pub fn flip_state(&mut self) { + #[allow(dead_code)] + pub fn flip_status_to_too_long(&mut self) { if let RefreshTaskStatus::Reconnecting(notifier) = self { debug!( "RefreshTaskStatus: Notifying tasks before transitioning to ReconnectingTooLong." @@ -220,6 +222,15 @@ impl RefreshTaskStatus { debug!("RefreshTaskStatus: Already in ReconnectingTooLong status."); } } + + pub fn notify_waiting_requests(&mut self) { + if let RefreshTaskStatus::Reconnecting(notifier) = self { + debug!("RefreshTaskStatus::notify_waiting_requests notify"); + notifier.notify(); + } else { + debug!("RefreshTaskStatus::notify_waiting_requests - ReconnectingTooLong status."); + } + } } // Struct combining the task handle and its status @@ -501,6 +512,51 @@ where }) } + // Fetches the master address for a given route. + // Returns `None` if no master address can be resolved. + pub(crate) fn address_for_route(&self, route: &Route) -> Option { + let slot_map_value = self.slot_map.slot_value_for_route(route)?; + Some(slot_map_value.addrs.primary().clone().to_string()) + } + + // Retrieves the notifier for a reconnect task associated with a given route. + // Returns `Some(Arc)` if a reconnect task is in the `Reconnecting` state. + // Returns `None` if: + // - There is no refresh task for the route's address. + // - The reconnect task is in `ReconnectingTooLong` state, with a debug log for clarity. + pub(crate) fn notifier_for_route(&self, route: &Route) -> Option> { + let address = self.address_for_route(route)?; + + if let Some(task_state) = self + .refresh_conn_state + .refresh_address_in_progress + .get(&address) + { + match &task_state.status { + RefreshTaskStatus::Reconnecting(notifier) => { + debug!( + "notifier_for_route: Found reconnect notifier for address: {}", + address + ); + Some(notifier.get_notifier()) + } + RefreshTaskStatus::ReconnectingTooLong => { + debug!( + "notifier_for_route: Address {} is in ReconnectingTooLong state. No notifier will be returned.", + address + ); + None + } + } + } else { + debug!( + "notifier_for_route: No refresh task exists for address: {}. No notifier will be returned.", + address + ); + None + } + } + pub(crate) fn all_node_connections( &self, ) -> impl Iterator> + '_ { diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 69683fda97..36334d497d 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1414,83 +1414,99 @@ where "Refreshing connection task to {:?} started", address_clone_for_task ); - let _ = async { - let node_option = if check_existing_conn { - let connections_container = - inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); - connections_container - .connection_map() - .get(&address_clone_for_task) - .map(|node| node.value().clone()) - } else { - None - }; - let mut cluster_params = inner_clone - .cluster_params - .read() - .expect(MUTEX_READ_ERR) - .clone(); - let subs_guard = inner_clone.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = - subs_guard.get(&address_clone_for_task).cloned(); - drop(subs_guard); + let node_option = if check_existing_conn { + let connections_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + connections_container + .connection_map() + .get(&address_clone_for_task) + .map(|node| node.value().clone()) + } else { + None + }; - let node_result = get_or_create_conn( - &address_clone_for_task, - node_option, - &cluster_params, - conn_type, - inner_clone.glide_connection_options.clone(), - ) - .await; + let mut cluster_params = inner_clone + .cluster_params + .read() + .expect(MUTEX_READ_ERR) + .clone(); + let subs_guard = inner_clone.subscriptions_by_address.read().await; + cluster_params.pubsub_subscriptions = + subs_guard.get(&address_clone_for_task).cloned(); + drop(subs_guard); + + let node_result = get_or_create_conn( + &address_clone_for_task, + node_option, + &cluster_params, + conn_type, + inner_clone.glide_connection_options.clone(), + ) + .await; - // Maintain the newly refreshed connection separately from the main connection map. - // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. - // This approach ensures that all requests within the current batch interact with a consistent connection map, - // preventing potential reordering issues. - // - // By delaying the integration of the refreshed connection: - // - // 1. We maintain consistency throughout the processing of a batch of requests. - // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. - // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. - // - // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is - // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information - // with the requirement for consistent request handling within each processing cycle. - match node_result { - Ok(node) => { - debug!( - "Succeeded to refresh connection for node {}.", - address_clone_for_task - ); - inner_clone - .conn_lock - .write() - .expect(MUTEX_READ_ERR) - .refresh_conn_state - .refresh_addresses_done - .insert(address_clone_for_task, Some(node)); - } - Err(err) => { - warn!( - "Failed to refresh connection for node {}. Error: `{:?}`", - address_clone_for_task, err - ); - // TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move - // the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection. - inner_clone - .conn_lock - .write() - .expect(MUTEX_READ_ERR) - .refresh_conn_state - .refresh_addresses_done - .insert(address_clone_for_task, None); - } + // Maintain the newly refreshed connection separately from the main connection map. + // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. + // This approach ensures that all requests within the current batch interact with a consistent connection map, + // preventing potential reordering issues. + // + // By delaying the integration of the refreshed connection: + // + // 1. We maintain consistency throughout the processing of a batch of requests. + // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. + // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. + // + // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is + // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information + // with the requirement for consistent request handling within each processing cycle. + match node_result { + Ok(node) => { + debug!( + "Succeeded to refresh connection for node {}.", + address_clone_for_task + ); + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_done + .insert(address_clone_for_task.clone(), Some(node)); + } + Err(err) => { + warn!( + "Failed to refresh connection for node {}. Error: `{:?}`", + address_clone_for_task, err + ); + // TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move + // the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection. + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_done + .insert(address_clone_for_task.clone(), None); } } - .await; + + // Need to notify here the awaitng requests inorder to awaket the context of the poll_flush as + // it awaits on this notifier inside the get_connection in the poll_next inside poll_complete. + // Otherwise poll_flush won't be polled until the next start_send or other requests I/O. + if let Some(task_state) = inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_address_in_progress + .get_mut(&address_clone_for_task) + { + task_state.status.notify_waiting_requests(); + } else { + warn!( + "No refresh task state found for address: {}", + address_clone_for_task + ); + } info!("Refreshing connection task to {:?} is done", address_clone); }); @@ -2296,13 +2312,16 @@ where ) } InternalSingleNodeRouting::SpecificNode(route) => { - if let Some((conn, address)) = core - .conn_lock - .read() - .expect(MUTEX_READ_ERR) - .connection_for_route(&route) - { - ConnectionCheck::Found((conn, address)) + // Step 1: Attempt to get the connection directly using the route. + let conn_check = { + let conn_lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + conn_lock + .connection_for_route(&route) + .map(ConnectionCheck::Found) + }; + + if let Some(conn_check) = conn_check { + conn_check } else { // No connection is found for the given route: // - For key-based commands, attempt redirection to a random node, @@ -2310,6 +2329,9 @@ where // - For non-key-based commands, avoid attempting redirection to a random node // as it wouldn't result in MOVED hints and can lead to unwanted results // (e.g., sending management command to a different node than the user asked for); instead, raise the error. + let mut conn_check = ConnectionCheck::RandomConnection; + + // Step 2: Handle cases where no connection is found for the route. let routable_cmd = cmd.and_then(|cmd| Routable::command(&*cmd)); if routable_cmd.is_some() && !RoutingInfo::is_key_routing_command(&routable_cmd.unwrap()) @@ -2320,10 +2342,51 @@ where format!("{route:?}"), ) .into()); + } + + debug!( + "SpecificNode: No connection found for route `{route:?}`. Checking for reconnect tasks before redirecting to a random node." + ); + + // Step 3: Obtain the reconnect notifier, ensuring the lock is released immediately after. + let reconnect_notifier = { + let conn_lock = core.conn_lock.write().expect(MUTEX_READ_ERR); + conn_lock.notifier_for_route(&route).clone() + }; + + // Step 4: If a notifier exists, wait for it to signal completion. + if let Some(notifier) = reconnect_notifier { + debug!( + "SpecificNode: Waiting on reconnect notifier for route `{route:?}`." + ); + + // Drop the lock before awaiting + notifier.notified().await; + + debug!( + "SpecificNode: Finished waiting on notifier for route `{route:?}`. Retrying connection lookup." + ); + + // Step 5: Retry the connection lookup after waiting for the reconnect task. + if let Some((conn, address)) = core + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .connection_for_route(&route) + { + conn_check = ConnectionCheck::Found((conn, address)); + } else { + debug!( + "SpecificNode: No connection found for route `{route:?}` after waiting on reconnect notifier. Proceeding to random node." + ); + } } else { - warn!("No connection found for route `{route:?}`. Attempting redirection to a random node."); - ConnectionCheck::RandomConnection + debug!( + "SpecificNode: No active reconnect task for route `{route:?}`. Proceeding to random node." + ); } + + conn_check } } InternalSingleNodeRouting::Random => ConnectionCheck::RandomConnection, From 66a3e3940fb09a37073626e244d9847479c3eb5e Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Thu, 16 Jan 2025 15:53:25 +0000 Subject: [PATCH 14/17] Fix slow path reconnect process to call update on the connection map Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 35 +++++ .../redis-rs/redis/src/cluster_async/mod.rs | 125 +++++++++++------- 2 files changed, 112 insertions(+), 48 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index e9aa4e1201..f52595e8b3 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -281,6 +281,12 @@ pub(crate) struct RefreshConnectionStates { } impl RefreshConnectionStates { + // Clears all ongoing refresh connection tasks and resets associated state tracking. + // + // - This method removes all entries in the `refresh_address_in_progress` map. + // - The `Drop` trait is responsible for notifying the associated notifiers and aborting any unfinished refresh tasks. + // - Additionally, this method clears `refresh_addresses_started` and `refresh_addresses_done` + // to ensure no stale data remains in the refresh state tracking. pub(crate) fn clear_refresh_state(&mut self) { debug!( "clear_refresh_state: removing all in-progress refresh connection tasks for addresses: {:?}", @@ -294,6 +300,35 @@ impl RefreshConnectionStates { self.refresh_addresses_started.clear(); self.refresh_addresses_done.clear(); } + + // Collects the notifiers for the given addresses and returns them as a vector. + // + // This function retrieves the notifiers for the provided addresses from the `refresh_address_in_progress` + // map and returns them, so they can be awaited outside of the lock. + // + // # Arguments + // * `addresses` - A list of addresses for which notifiers are required. + // + // # Returns + // A vector of `futures::future::Notified` that can be awaited. + pub(crate) fn collect_refresh_notifiers( + &self, + addresses: &HashSet, + ) -> Vec> { + addresses + .iter() + .filter_map(|address| { + self.refresh_address_in_progress + .get(address) + .and_then(|refresh_state| match &refresh_state.status { + RefreshTaskStatus::Reconnecting(notifier) => { + Some(notifier.get_notifier().clone()) + } + _ => None, + }) + }) + .collect() + } } impl Default for RefreshConnectionStates { diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 36334d497d..537dc40abb 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1364,23 +1364,59 @@ where if !addrs_to_refresh.is_empty() { // don't try existing nodes since we know a. it does not exist. b. exist but its connection is closed - Self::refresh_connections( + Self::refresh_and_update_connections( inner.clone(), addrs_to_refresh, - RefreshConnectionType::OnlyUserConnection, - true, + RefreshConnectionType::AllConnections, + false, ) .await; } } - async fn refresh_connections( + // Creates refresh tasks, await on the tasks' notifier and the update the connection_container. + // Awaiting on the notifier guaranties at least one reconnect attempt on each address. + async fn refresh_and_update_connections( inner: Arc>, addresses: HashSet, conn_type: RefreshConnectionType, check_existing_conn: bool, ) { - info!("Started refreshing connections to {:?}", addresses); + trace!("refresh_and_update_connections: calling trigger_refresh_connection_tasks"); + Self::trigger_refresh_connection_tasks( + inner.clone(), + addresses.clone(), + conn_type, + check_existing_conn, + ) + .await; + + trace!("refresh_and_update_connections: Await on all tasks' refresh notifier"); + // Await on all tasks' refresh notifier if exists + let refresh_task_notifiers = inner + .clone() + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .collect_refresh_notifiers(&addresses); + let futures: Vec<_> = refresh_task_notifiers + .iter() + .map(|notify| notify.notified()) + .collect(); + futures::future::join_all(futures).await; + + // Update the connections in the connection_container + Self::update_refreshed_connection(inner); + } + + async fn trigger_refresh_connection_tasks( + inner: Arc>, + addresses: HashSet, + conn_type: RefreshConnectionType, + check_existing_conn: bool, + ) { + debug!("Triggering refresh connections tasks to {:?} ", addresses); for address in addresses { if inner @@ -1854,7 +1890,7 @@ where if !addrs_to_refresh.is_empty() { // immediately trigger connection reestablishment - Self::refresh_connections( + Self::refresh_and_update_connections( inner.clone(), addrs_to_refresh, RefreshConnectionType::AllConnections, @@ -1890,7 +1926,8 @@ where } if !failed_connections.is_empty() { - Self::refresh_connections( + trace!("check_for_topology_diff: calling trigger_refresh_connection_tasks"); + Self::trigger_refresh_connection_tasks( inner, failed_connections.into_iter().collect::>(), RefreshConnectionType::OnlyManagementConnection, @@ -2416,7 +2453,7 @@ where ConnectionCheck::Found((address, connection)) => (address, connection.await), ConnectionCheck::OnlyAddress(address) => { // No connection for this address in the conn_map - Self::refresh_connections( + Self::trigger_refresh_connection_tasks( core.clone(), HashSet::from_iter(once(address.clone())), RefreshConnectionType::AllConnections, @@ -2518,6 +2555,7 @@ where } fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll> { + trace!("entered poll_recovere"); let recover_future = match &mut self.state { ConnectionState::PollComplete => return Poll::Ready(Ok(())), ConnectionState::Recover(future) => future, @@ -2585,10 +2623,10 @@ where Self::try_request(info, core).await } - fn update_refreshed_connection(&mut self) { + fn update_refreshed_connection(inner: Arc>) { trace!("update_refreshed_connection started"); loop { - let connections_container = self.inner.conn_lock.read().expect(MUTEX_READ_ERR); + let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR); // Check if both sets are empty if connections_container @@ -2617,21 +2655,18 @@ where .cloned() .collect(); - let current_existing_addresses_in_slot_map = - connections_container.slot_map.all_node_addresses(); - drop(connections_container); // Process refresh_addresses_started for address in addresses_to_remove { - self.inner + inner .conn_lock .write() .expect(MUTEX_READ_ERR) .refresh_conn_state .refresh_addresses_started .remove(&address); - self.inner + inner .conn_lock .write() .expect(MUTEX_READ_ERR) @@ -2640,40 +2675,35 @@ where // Process refresh_addresses_done for address in addresses_done { - // Check if this address appears in the current topology - if current_existing_addresses_in_slot_map.contains(&address) { - // Check if the address exists in refresh_addresses_done - let mut conn_lock_write = self.inner.conn_lock.write().expect(MUTEX_READ_ERR); - if let Some(conn_option) = conn_lock_write - .refresh_conn_state - .refresh_addresses_done - .get_mut(&address) - { - // Match the content of the Option - match conn_option.take() { - Some(conn) => { - debug!( - "update_refreshed_connection: found refreshed connection for address {}", - address - ); - // Move the node_conn to the function - conn_lock_write - .replace_or_add_connection_for_address(address.clone(), conn); - } - None => { - debug!( - "update_refreshed_connection: task completed, but no connection for address {}", - address - ); - } + // Check if the address exists in refresh_addresses_done + let mut conn_lock_write = inner.conn_lock.write().expect(MUTEX_READ_ERR); + if let Some(conn_option) = conn_lock_write + .refresh_conn_state + .refresh_addresses_done + .get_mut(&address) + { + // Match the content of the Option + match conn_option.take() { + Some(conn) => { + debug!( + "update_refreshed_connection: found refreshed connection for address {}", + address + ); + // Move the node_conn to the function + conn_lock_write + .replace_or_add_connection_for_address(address.clone(), conn); + } + None => { + debug!( + "update_refreshed_connection: task completed, but no connection for address {}", + address + ); } } - } else { - debug!("update_refreshed_connection: address {:?} doesn't appear in addresses in slot_map: {:?}", address, current_existing_addresses_in_slot_map); } // Remove this address from refresh_addresses_done - self.inner + inner .conn_lock .write() .expect(MUTEX_READ_ERR) @@ -2682,8 +2712,7 @@ where .remove(&address); // Remove this entry from refresh_address_in_progress - if let Some(_) = self - .inner + if let Some(_) = inner .conn_lock .write() .expect(MUTEX_READ_ERR) @@ -2933,7 +2962,7 @@ where // In case of active poll_recovery, the work should // take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of // non-relevant addresses. - self.update_refreshed_connection(); + ClusterConnInner::update_refreshed_connection(self.inner.clone()); match ready!(self.poll_complete(cx)) { PollFlushAction::None => return Poll::Ready(Ok(())), @@ -2947,7 +2976,7 @@ where } PollFlushAction::Reconnect(addresses) => { self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin( - ClusterConnInner::refresh_connections( + ClusterConnInner::trigger_refresh_connection_tasks( self.inner.clone(), addresses, RefreshConnectionType::OnlyUserConnection, From 7edb54f0ea579e2ef274ae131eb0b3ac37e407c9 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 21 Jan 2025 10:01:22 +0000 Subject: [PATCH 15/17] fix deadlock Signed-off-by: GilboaAWS --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 537dc40abb..6d83cb5cf5 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2703,19 +2703,13 @@ where } // Remove this address from refresh_addresses_done - inner - .conn_lock - .write() - .expect(MUTEX_READ_ERR) + conn_lock_write .refresh_conn_state .refresh_addresses_done .remove(&address); // Remove this entry from refresh_address_in_progress - if let Some(_) = inner - .conn_lock - .write() - .expect(MUTEX_READ_ERR) + if let Some(_) = conn_lock_write .refresh_conn_state .refresh_address_in_progress .remove(&address) From 2e569a5040c43107510344c89bd8b2d8a7859adc Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Tue, 21 Jan 2025 12:26:18 +0000 Subject: [PATCH 16/17] fix trigger refresh order Signed-off-by: GilboaAWS --- .../redis-rs/redis/src/cluster_async/mod.rs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 6d83cb5cf5..0e33b4491e 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1445,22 +1445,22 @@ where .refresh_addresses_started .insert(address_clone_for_task.clone()); + let node_option = if check_existing_conn { + let connections_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + connections_container + .connection_map() + .get(&address_clone_for_task) + .map(|node| node.value().clone()) + } else { + None + }; + let handle = tokio::spawn(async move { info!( - "Refreshing connection task to {:?} started", + "refreshing connection task to {:?} started", address_clone_for_task ); - let node_option = if check_existing_conn { - let connections_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); - connections_container - .connection_map() - .get(&address_clone_for_task) - .map(|node| node.value().clone()) - } else { - None - }; - let mut cluster_params = inner_clone .cluster_params .read() @@ -1556,7 +1556,7 @@ where .refresh_address_in_progress .insert(address.clone(), RefreshTaskState::new(handle)); } - debug!("refresh connection tasks initiated"); + debug!("trigger_refresh_connection_tasks: Done"); } async fn aggregate_results( From 78a966f05e72510fcded57ceed491a5b42869b50 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Wed, 22 Jan 2025 06:42:37 +0000 Subject: [PATCH 17/17] fix lint for redis-rs Signed-off-by: GilboaAWS --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 0e33b4491e..3334c620fe 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2709,10 +2709,11 @@ where .remove(&address); // Remove this entry from refresh_address_in_progress - if let Some(_) = conn_lock_write + if conn_lock_write .refresh_conn_state .refresh_address_in_progress .remove(&address) + .is_some() { debug!( "update_refreshed_connection: Successfully removed refresh state for address: {}",