From b2caf01970c9f3ff57275332a6e35c74b6218bd1 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Mon, 6 Jan 2025 16:21:30 +0000 Subject: [PATCH] Bug fix Signed-off-by: GilboaAWS --- .../redis-rs/redis/src/cluster_async/mod.rs | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 7bda70fbb4..38228821a7 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1454,20 +1454,21 @@ where .get_mut(&address_clone_for_task) { refresh_state.node_conn = Some(node); - } - connection_container - .refresh_addresses_done - .insert(address_clone_for_task); - Ok(()) + }; } Err(err) => { warn!( "Failed to refresh connection for node {}. Error: `{:?}`", address_clone_for_task, err ); - Err(err) } } + inner_clone + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .refresh_addresses_done + .insert(address_clone_for_task); } .await; @@ -1487,7 +1488,7 @@ where }, ); } - debug!("refresh connection tasts initiated"); + debug!("refresh connection tasks initiated"); } async fn aggregate_results( @@ -2338,7 +2339,7 @@ where ) .await; return Err(RedisError::from(( - ErrorKind::AllConnectionsUnavailable, + ErrorKind::ConnectionNotFoundForRoute, "No connection for the address, started a refresh task", ))); } @@ -2442,6 +2443,13 @@ where loop { let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); + // Check if both sets are empty + if connections_container.refresh_addresses_started.is_empty() + && connections_container.refresh_addresses_done.is_empty() + { + break; + } + // Process refresh_addresses_started let addresses_to_remove: Vec = connections_container .refresh_addresses_started @@ -2483,14 +2491,10 @@ where } } // Remove this entry from refresh_ops_map - connections_container.refresh_operations.remove(&address); - } - - // Check if both sets are empty - if connections_container.refresh_addresses_started.is_empty() - && connections_container.refresh_addresses_done.is_empty() - { - break; + match connections_container.refresh_operations.remove(&address) { + Some(_) => (), + None => warn!("update_refreshed_connection: No refresh operation found to remove for address: {:?}", address), + } } // Release the lock before the next iteration