diff --git a/glide-core/redis-rs/redis/src/aio/connection.rs b/glide-core/redis-rs/redis/src/aio/connection.rs index 5adef7869f..2b32a7ced3 100644 --- a/glide-core/redis-rs/redis/src/aio/connection.rs +++ b/glide-core/redis-rs/redis/src/aio/connection.rs @@ -7,7 +7,7 @@ use crate::connection::{ resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo, Msg, RedisConnectionInfo, }; -#[cfg(any(feature = "tokio-comp"))] +#[cfg(feature = "tokio-comp")] use crate::parser::ValueCodec; use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value}; use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs}; diff --git a/glide-core/redis-rs/redis/src/aio/connection_manager.rs b/glide-core/redis-rs/redis/src/aio/connection_manager.rs index dce7b254a5..83d680ae53 100644 --- a/glide-core/redis-rs/redis/src/aio/connection_manager.rs +++ b/glide-core/redis-rs/redis/src/aio/connection_manager.rs @@ -78,12 +78,12 @@ macro_rules! reconnect_if_dropped { }; } -/// Handle a connection result. If there's an I/O error, reconnect. +/// Handle a connection result. If the connection has dropped, reconnect. /// Propagate any error. -macro_rules! reconnect_if_io_error { +macro_rules! reconnect_if_conn_dropped { ($self:expr, $result:expr, $current:expr) => { if let Err(e) = $result { - if e.is_io_error() { + if e.is_connection_dropped() { $self.reconnect($current); } return Err(e); @@ -249,7 +249,7 @@ impl ConnectionManager { .clone() .await .map_err(|e| e.clone_mostly("Reconnecting failed")); - reconnect_if_io_error!(self, connection_result, guard); + reconnect_if_conn_dropped!(self, connection_result, guard); let result = connection_result?.send_packed_command(cmd).await; reconnect_if_dropped!(self, &result, guard); result @@ -270,7 +270,7 @@ impl ConnectionManager { .clone() .await .map_err(|e| e.clone_mostly("Reconnecting failed")); - reconnect_if_io_error!(self, connection_result, guard); + reconnect_if_conn_dropped!(self, connection_result, guard); let result = connection_result? .send_packed_commands(cmd, offset, count) .await; diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index fb1b62f8a1..c23d4dfca4 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -349,7 +349,7 @@ where &mut self, item: SinkItem, timeout: Duration, - ) -> Result> { + ) -> Result { self.send_recv(item, None, timeout).await } @@ -359,7 +359,7 @@ where // If `None`, this is a single request, not a pipeline of multiple requests. pipeline_response_count: Option, timeout: Duration, - ) -> Result> { + ) -> Result { let (sender, receiver) = oneshot::channel(); self.sender @@ -369,15 +369,29 @@ where output: sender, }) .await - .map_err(|_| None)?; + .map_err(|err| { + // If an error occurs here, it means the request never reached the server, as guaranteed + // by the 'send' function. Since the server did not receive the data, it is safe to retry + // the request. + RedisError::from(( + crate::ErrorKind::FatalSendError, + "Failed to send the request to the server", + err.to_string(), + )) + })?; match Runtime::locate().timeout(timeout, receiver).await { - Ok(Ok(result)) => result.map_err(Some), - Ok(Err(_)) => { - // The `sender` was dropped which likely means that the stream part - // failed for one reason or another - Err(None) + Ok(Ok(result)) => result, + Ok(Err(err)) => { + // The `sender` was dropped, likely indicating a failure in the stream. + // This error suggests that it's unclear whether the server received the request before the connection failed, + // making it unsafe to retry. For example, retrying an INCR request could result in double increments. + Err(RedisError::from(( + crate::ErrorKind::FatalReceiveError, + "Failed to receive a response due to a fatal error", + err.to_string(), + ))) } - Err(elapsed) => Err(Some(elapsed.into())), + Err(elapsed) => Err(elapsed.into()), } } @@ -503,10 +517,7 @@ impl MultiplexedConnection { let result = self .pipeline .send_single(cmd.get_packed_command(), self.response_timeout) - .await - .map_err(|err| { - err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) - }); + .await; if self.protocol != ProtocolVersion::RESP2 { if let Err(e) = &result { if e.is_connection_dropped() { @@ -537,10 +548,7 @@ impl MultiplexedConnection { Some(offset + count), self.response_timeout, ) - .await - .map_err(|err| { - err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) - }); + .await; if self.protocol != ProtocolVersion::RESP2 { if let Err(e) = &result { diff --git a/glide-core/redis-rs/redis/src/cluster.rs b/glide-core/redis-rs/redis/src/cluster.rs index f9c76f5161..170cac47b3 100644 --- a/glide-core/redis-rs/redis/src/cluster.rs +++ b/glide-core/redis-rs/redis/src/cluster.rs @@ -43,7 +43,9 @@ use std::time::Duration; use rand::{seq::IteratorRandom, thread_rng}; +pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder}; use crate::cluster_pipeline::UNROUTABLE_ERROR; +pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline}; use crate::cluster_routing::{ MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, }; @@ -54,7 +56,7 @@ use crate::connection::{ connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo, }; use crate::parser::parse_redis_value; -use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, Value}; +use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, RetryMethod, Value}; pub use crate::TlsMode; // Pub for backwards compatibility use crate::{ cluster_client::ClusterParams, @@ -62,9 +64,6 @@ use crate::{ IntoConnectionInfo, PushInfo, }; -pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder}; -pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline}; - use tokio::sync::mpsc; #[cfg(feature = "tls-rustls")] @@ -749,12 +748,12 @@ where retries += 1; match err.retry_method() { - crate::types::RetryMethod::AskRedirect => { + RetryMethod::AskRedirect => { redirected = err .redirect_node() .map(|(node, _slot)| Redirect::Ask(node.to_string())); } - crate::types::RetryMethod::MovedRedirect => { + RetryMethod::MovedRedirect => { // Refresh slots. self.refresh_slots()?; // Request again. @@ -762,8 +761,8 @@ where .redirect_node() .map(|(node, _slot)| Redirect::Moved(node.to_string())); } - crate::types::RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica - | crate::types::RetryMethod::WaitAndRetry => { + RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica + | RetryMethod::WaitAndRetry => { // Sleep and retry. let sleep_time = self .cluster_params @@ -771,7 +770,7 @@ where .wait_time_for_retry(retries); thread::sleep(sleep_time); } - crate::types::RetryMethod::Reconnect => { + RetryMethod::Reconnect | RetryMethod::ReconnectAndRetry => { if *self.auto_reconnect.borrow() { if let Ok(mut conn) = self.connect(&addr) { if conn.check_connection() { @@ -780,10 +779,10 @@ where } } } - crate::types::RetryMethod::NoRetry => { + RetryMethod::NoRetry => { return Err(err); } - crate::types::RetryMethod::RetryImmediately => {} + RetryMethod::RetryImmediately => {} } } } diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 2bfbb8b934..d89d063b78 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -255,16 +255,18 @@ where &self, amount: usize, conn_type: ConnectionType, - ) -> impl Iterator> + '_ { - self.connection_map - .iter() - .choose_multiple(&mut rand::thread_rng(), amount) - .into_iter() - .map(move |item| { - let (address, node) = (item.key(), item.value()); - let conn = node.get_connection(&conn_type); - (address.clone(), conn) - }) + ) -> Option> + '_> { + (!self.connection_map.is_empty()).then_some({ + self.connection_map + .iter() + .choose_multiple(&mut rand::thread_rng(), amount) + .into_iter() + .map(move |item| { + let (address, node) = (item.key(), item.value()); + let conn = node.get_connection(&conn_type); + (address.clone(), conn) + }) + }) } pub(crate) fn replace_or_add_connection_for_address( @@ -633,6 +635,7 @@ mod tests { let random_connections: HashSet<_> = container .random_connections(3, ConnectionType::User) + .expect("No connections found") .map(|pair| pair.1) .collect(); @@ -647,12 +650,9 @@ mod tests { let container = create_container(); remove_all_connections(&container); - assert_eq!( - 0, - container - .random_connections(1, ConnectionType::User) - .count() - ); + assert!(container + .random_connections(1, ConnectionType::User) + .is_none()); } #[test] @@ -665,6 +665,7 @@ mod tests { ); let random_connections: Vec<_> = container .random_connections(1, ConnectionType::User) + .expect("No connections found") .collect(); assert_eq!(vec![(address, 4)], random_connections); @@ -675,6 +676,7 @@ mod tests { let container = create_container(); let mut random_connections: Vec<_> = container .random_connections(1000, ConnectionType::User) + .expect("No connections found") .map(|pair| pair.1) .collect(); random_connections.sort(); @@ -687,6 +689,7 @@ mod tests { let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true); let mut random_connections: Vec<_> = container .random_connections(1000, ConnectionType::PreferManagement) + .expect("No connections found") .map(|pair| pair.1) .collect(); random_connections.sort(); diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index c8628c16bb..aa9f02e1e6 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -845,6 +845,7 @@ impl Future for Request { let request = this.request.as_mut().unwrap(); // TODO - would be nice if we didn't need to repeat this code twice, with & without retries. if request.retry >= this.retry_params.number_of_retries { + let retry_method = err.retry_method(); let next = if err.kind() == ErrorKind::AllConnectionsUnavailable { Next::ReconnectToInitialNodes { request: None }.into() } else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect) @@ -855,7 +856,9 @@ impl Future for Request { sleep_duration: None, } .into() - } else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) { + } else if matches!(retry_method, crate::types::RetryMethod::Reconnect) + || matches!(retry_method, crate::types::RetryMethod::ReconnectAndRetry) + { if let OperationTarget::Node { address } = target { Next::Reconnect { request: None, @@ -934,13 +937,18 @@ impl Future for Request { }); self.poll(cx) } - crate::types::RetryMethod::Reconnect => { + crate::types::RetryMethod::Reconnect + | crate::types::RetryMethod::ReconnectAndRetry => { let mut request = this.request.take().unwrap(); // TODO should we reset the redirect here? request.info.reset_routing(); warn!("disconnected from {:?}", address); + let should_retry = matches!( + err.retry_method(), + crate::types::RetryMethod::ReconnectAndRetry + ); Next::Reconnect { - request: Some(request), + request: should_retry.then_some(request), target: address, } .into() @@ -1177,8 +1185,11 @@ where Ok(connections.0) } - fn reconnect_to_initial_nodes(&mut self) -> impl Future { - let inner = self.inner.clone(); + // Reconnet to the initial nodes provided by the user in the creation of the client, + // and try to refresh the slots based on the initial connections. + // Being used when all cluster connections are unavailable. + fn reconnect_to_initial_nodes(inner: Arc>) -> impl Future { + let inner = inner.clone(); async move { let connection_map = match Self::create_initial_connections( &inner.initial_nodes, @@ -1680,7 +1691,9 @@ where Self::refresh_slots_inner(inner, curr_retry) .await .map_err(|err| { - if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES { + if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES + || err.kind() == ErrorKind::AllConnectionsUnavailable + { BackoffError::Permanent(err) } else { BackoffError::from(err) @@ -2073,14 +2086,22 @@ where } ConnectionCheck::RandomConnection => { let read_guard = core.conn_lock.read().await; - let (random_address, random_conn_future) = read_guard + read_guard .random_connections(1, ConnectionType::User) - .next() - .ok_or(RedisError::from(( - ErrorKind::AllConnectionsUnavailable, - "No random connection found", - )))?; - return Ok((random_address, random_conn_future.await)); + .and_then(|mut random_connections| { + random_connections.next().map( + |(random_address, random_conn_future)| async move { + (random_address, random_conn_future.await) + }, + ) + }) + .ok_or_else(|| { + RedisError::from(( + ErrorKind::AllConnectionsUnavailable, + "No random connection found", + )) + })? + .await } }; @@ -2104,10 +2125,19 @@ where } Err(err) => { trace!("Recover slots failed!"); - *future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries( - self.inner.clone(), - &RefreshPolicy::Throttable, - )); + let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable { + ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin( + ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()), + ))) + } else { + ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin( + Self::refresh_slots_and_subscriptions_with_retries( + self.inner.clone(), + &RefreshPolicy::Throttable, + ), + ))) + }; + self.state = next_state; Poll::Ready(Err(err)) } }, @@ -2226,9 +2256,7 @@ where })); } } - Next::Reconnect { - request, target, .. - } => { + Next::Reconnect { request, target } => { poll_flush_action = poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target])); if let Some(request) = request { @@ -2371,7 +2399,7 @@ where } PollFlushAction::ReconnectFromInitialConnections => { self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin( - self.reconnect_to_initial_nodes(), + ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()), ))); } } @@ -2413,8 +2441,19 @@ async fn calculate_topology_from_random_nodes<'a, C>( where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - let requested_nodes = - read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement); + let requested_nodes = if let Some(random_conns) = + read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement) + { + random_conns + } else { + return ( + Err(RedisError::from(( + ErrorKind::AllConnectionsUnavailable, + "No available connections to refresh slots from", + ))), + vec![], + ); + }; let topology_join_results = futures::future::join_all(requested_nodes.map(|(addr, conn)| async move { let mut conn: C = conn.await; diff --git a/glide-core/redis-rs/redis/src/types.rs b/glide-core/redis-rs/redis/src/types.rs index a024f16a7d..7e303df68a 100644 --- a/glide-core/redis-rs/redis/src/types.rs +++ b/glide-core/redis-rs/redis/src/types.rs @@ -118,6 +118,14 @@ pub enum ErrorKind { /// not native to the system. This is usually the case if /// the cause is another error. IoError, + /// An error indicating that a fatal error occurred while attempting to send a request to the server, + /// meaning the connection was closed before the request was transmitted. Since the server did not process the request, + /// it is safe to retry the request. + FatalSendError, + /// An error indicating that a fatal error occurred while trying to receive a response, + /// likely due to the closure of the underlying connection. It is unclear whether + /// the server processed the request, making it unsafe to retry the request. + FatalReceiveError, /// An error raised that was identified on the client before execution. ClientError, /// An extension error. This is an error created by the server @@ -802,6 +810,7 @@ impl fmt::Debug for RedisError { pub(crate) enum RetryMethod { Reconnect, + ReconnectAndRetry, NoRetry, RetryImmediately, WaitAndRetry, @@ -870,6 +879,10 @@ impl RedisError { ErrorKind::CrossSlot => "cross-slot", ErrorKind::MasterDown => "master down", ErrorKind::IoError => "I/O error", + ErrorKind::FatalSendError => { + "failed to send the request to the server due to a fatal error - the request was not transmitted" + } + ErrorKind::FatalReceiveError => "a fatal error occurred while attempting to receive a response from the server", ErrorKind::ExtensionError => "extension error", ErrorKind::ClientError => "client error", ErrorKind::ReadOnly => "read-only", @@ -942,6 +955,12 @@ impl RedisError { /// Returns true if error was caused by a dropped connection. pub fn is_connection_dropped(&self) -> bool { + if matches!( + self.kind(), + ErrorKind::FatalSendError | ErrorKind::FatalReceiveError + ) { + return true; + } match self.repr { ErrorRepr::IoError(ref err) => matches!( err.kind(), @@ -957,6 +976,7 @@ impl RedisError { pub fn is_unrecoverable_error(&self) -> bool { match self.retry_method() { RetryMethod::Reconnect => true, + RetryMethod::ReconnectAndRetry => true, RetryMethod::NoRetry => false, RetryMethod::RetryImmediately => false, @@ -1064,12 +1084,15 @@ impl RedisError { io::ErrorKind::PermissionDenied => RetryMethod::NoRetry, io::ErrorKind::Unsupported => RetryMethod::NoRetry, + io::ErrorKind::TimedOut => RetryMethod::NoRetry, _ => RetryMethod::RetryImmediately, }, _ => RetryMethod::RetryImmediately, }, ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry, + ErrorKind::FatalReceiveError => RetryMethod::Reconnect, + ErrorKind::FatalSendError => RetryMethod::ReconnectAndRetry, } } } diff --git a/glide-core/redis-rs/redis/tests/test_async.rs b/glide-core/redis-rs/redis/tests/test_async.rs index d16f1e0694..73d14de022 100644 --- a/glide-core/redis-rs/redis/tests/test_async.rs +++ b/glide-core/redis-rs/redis/tests/test_async.rs @@ -569,7 +569,7 @@ mod basic_async { Err(err) => break err, } }; - assert_eq!(err.kind(), ErrorKind::IoError); // Shouldn't this be IoError? + assert_eq!(err.kind(), ErrorKind::FatalSendError); } #[tokio::test] diff --git a/glide-core/redis-rs/redis/tests/test_cluster_async.rs b/glide-core/redis-rs/redis/tests/test_cluster_async.rs index e6a5984fa7..971a31a809 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_async.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_async.rs @@ -1015,7 +1015,6 @@ mod cluster_async { let sleep_duration = core::time::Duration::from_millis(100); #[cfg(feature = "tokio-comp")] tokio::time::sleep(sleep_duration).await; - } } panic!("Failed to reach to the expected topology refresh retries. Found={refreshed_calls}, Expected={expected_calls}") @@ -2542,8 +2541,8 @@ mod cluster_async { match port { 6380 => panic!("Node should not be called"), _ => match completed.fetch_add(1, Ordering::SeqCst) { - 0..=1 => Err(Err(RedisError::from(std::io::Error::new( - std::io::ErrorKind::ConnectionReset, + 0..=1 => Err(Err(RedisError::from(( + ErrorKind::FatalSendError, "mock-io-error", )))), _ => Err(Ok(Value::BulkString(b"123".to_vec()))), @@ -2598,6 +2597,81 @@ mod cluster_async { assert_eq!(completed.load(Ordering::SeqCst), 1); } + #[test] + #[serial_test::serial] + fn test_async_cluster_non_retryable_io_error_should_not_retry() { + let name = "test_async_cluster_non_retryable_io_error_should_not_retry"; + let requests = atomic::AtomicUsize::new(0); + let MockEnv { + runtime, + async_connection: mut connection, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(3), + name, + move |cmd: &[u8], _port| { + respond_startup_two_nodes(name, cmd)?; + let i = requests.fetch_add(1, atomic::Ordering::SeqCst); + match i { + 0 => Err(Err(RedisError::from((ErrorKind::IoError, "io-error")))), + _ => { + panic!("Expected not to be retried!") + } + } + }, + ); + runtime + .block_on(async move { + let res = cmd("INCR") + .arg("foo") + .query_async::<_, Option>(&mut connection) + .await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert!(err.is_io_error()); + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + + #[test] + #[serial_test::serial] + fn test_async_cluster_retry_safe_io_error_should_be_retried() { + let name = "test_async_cluster_retry_safe_io_error_should_be_retried"; + let requests = atomic::AtomicUsize::new(0); + let MockEnv { + runtime, + async_connection: mut connection, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(3), + name, + move |cmd: &[u8], _port| { + respond_startup_two_nodes(name, cmd)?; + let i = requests.fetch_add(1, atomic::Ordering::SeqCst); + match i { + 0 => Err(Err(RedisError::from(( + ErrorKind::FatalSendError, + "server didn't receive the request, safe to retry", + )))), + _ => Err(Ok(Value::Int(1))), + } + }, + ); + runtime + .block_on(async move { + let res = cmd("INCR") + .arg("foo") + .query_async::<_, i32>(&mut connection) + .await; + assert!(res.is_ok()); + let value = res.unwrap(); + assert_eq!(value, 1); + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + #[test] #[serial_test::serial] fn test_async_cluster_read_from_primary() { @@ -3186,10 +3260,17 @@ mod cluster_async { }; // wait for new topology discovery + let max_requests = 5; + let mut i = 0; + let mut cmd = redis::cmd("INFO"); + cmd.arg("SERVER"); loop { - let mut cmd = redis::cmd("INFO"); - cmd.arg("SERVER"); - let res = publishing_con + if i == max_requests { + panic!("Failed to recover and discover new topology"); + } + i += 1; + + if let Ok(res) = publishing_con .route_command( &cmd, RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( @@ -3197,21 +3278,21 @@ mod cluster_async { SlotAddr::Master, ))), ) - .await; - assert!(res.is_ok()); - let res = res.unwrap(); - match res { - Value::VerbatimString { format: _, text } => { - if text.contains(format!("tcp_port:{}", last_server_port).as_str()) { - // new topology rediscovered - break; + .await + { + match res { + Value::VerbatimString { format: _, text } => { + if text.contains(format!("tcp_port:{}", last_server_port).as_str()) { + // new topology rediscovered + break; + } + } + _ => { + panic!("Wrong return type for INFO SERVER command: {:?}", res); } } - _ => { - panic!("Wrong return type for INFO SERVER command: {:?}", res); - } + sleep(futures_time::time::Duration::from_secs(1)).await; } - sleep(futures_time::time::Duration::from_secs(1)).await; } // sleep for one one cycle of topology refresh @@ -3250,7 +3331,7 @@ mod cluster_async { if use_sharded { // validate SPUBLISH - let result = cmd("SPUBLISH") + let result = redis::cmd("SPUBLISH") .arg("test_channel_?") .arg("test_message") .query_async(&mut publishing_con) @@ -3757,9 +3838,26 @@ mod cluster_async { false, ); - let result = connection.req_packed_command(&cmd).await.unwrap(); - assert_eq!(result, Value::SimpleString("PONG".to_string())); - Ok::<_, RedisError>(()) + let max_requests = 5; + let mut i = 0; + let mut last_err = None; + loop { + if i == max_requests { + break; + } + i += 1; + match connection.req_packed_command(&cmd).await { + Ok(result) => { + assert_eq!(result, Value::SimpleString("PONG".to_string())); + return Ok::<_, RedisError>(()); + } + Err(err) => { + last_err = Some(err); + let _ = sleep(futures_time::time::Duration::from_secs(1)).await; + } + } + } + panic!("Failed to recover after all nodes went down. Last error: {last_err:?}"); }) .unwrap(); } @@ -3786,19 +3884,37 @@ mod cluster_async { ); let cmd = cmd("PING"); - // explicitly route to all primaries and request all succeeded - let result = connection - .route_command( - &cmd, - RoutingInfo::MultiNode(( - MultipleNodeRoutingInfo::AllMasters, - Some(redis::cluster_routing::ResponsePolicy::AllSucceeded), - )), - ) - .await; - assert!(result.is_ok()); - Ok::<_, RedisError>(()) + let max_requests = 5; + let mut i = 0; + let mut last_err = None; + loop { + if i == max_requests { + break; + } + i += 1; + // explicitly route to all primaries and request all succeeded + match connection + .route_command( + &cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(redis::cluster_routing::ResponsePolicy::AllSucceeded), + )), + ) + .await + { + Ok(result) => { + assert_eq!(result, Value::SimpleString("PONG".to_string())); + return Ok::<_, RedisError>(()); + } + Err(err) => { + last_err = Some(err); + let _ = sleep(futures_time::time::Duration::from_secs(1)).await; + } + } + } + panic!("Failed to recover after all nodes went down. Last error: {last_err:?}"); }) .unwrap(); } @@ -3871,7 +3987,10 @@ mod cluster_async { if connect_attempt > 5 { panic!("Too many pings!"); } - Err(Err(broken_pipe_error())) + Err(Err(RedisError::from(( + ErrorKind::FatalSendError, + "mock-io-error", + )))) } else { respond_startup_two_nodes(name, cmd)?; let past_get_attempts = get_attempts.fetch_add(1, Ordering::Relaxed); @@ -3879,7 +3998,10 @@ mod cluster_async { if past_get_attempts == 0 { // Error once with io-error, ensure connection is reestablished w/out calling // other node (i.e., not doing a full slot rebuild) - Err(Err(broken_pipe_error())) + Err(Err(RedisError::from(( + ErrorKind::FatalSendError, + "mock-io-error", + )))) } else { Err(Ok(Value::BulkString(b"123".to_vec()))) } @@ -3931,7 +4053,7 @@ mod cluster_async { .expect("Failed executing CLIENT LIST"); let mut client_list_parts = client_list.split('\n'); if client_list_parts - .any(|line| line.contains(MANAGEMENT_CONN_NAME) && line.contains("cmd=cluster")) + .any(|line| line.contains(MANAGEMENT_CONN_NAME) && line.contains("cmd=cluster")) && client_list.matches(MANAGEMENT_CONN_NAME).count() == 1 { return Ok::<_, RedisError>(()); } @@ -3983,21 +4105,23 @@ mod cluster_async { } async fn kill_connection(killer_connection: &mut ClusterConnection, connection_to_kill: &str) { + let default_routing = RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode( + Route::new(0, SlotAddr::Master), + )); + kill_connection_with_routing(killer_connection, connection_to_kill, default_routing).await; + } + + async fn kill_connection_with_routing( + killer_connection: &mut ClusterConnection, + connection_to_kill: &str, + routing: RoutingInfo, + ) { let mut cmd = redis::cmd("CLIENT"); cmd.arg("KILL"); cmd.arg("ID"); cmd.arg(connection_to_kill); - // Kill the management connection in the primary node that holds slot 0 - assert!(killer_connection - .route_command( - &cmd, - RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( - 0, - SlotAddr::Master, - )),), - ) - .await - .is_ok()); + // Kill the management connection for the routing node + assert!(killer_connection.route_command(&cmd, routing).await.is_ok()); } #[test] diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index a242eb80d1..c6e2b7b15d 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -172,7 +172,7 @@ mod socket_listener { } fn read_from_socket(buffer: &mut Vec, socket: &mut UnixStream) -> usize { - buffer.resize(100, 0_u8); + buffer.resize(300, 0_u8); socket.read(buffer).unwrap() }