Skip to content

Commit

Permalink
Avoid retrying on IO errors when it’s unclear if the server received …
Browse files Browse the repository at this point in the history
…the request

Signed-off-by: barshaul <barshaul@amazon.com>
  • Loading branch information
barshaul committed Oct 20, 2024
1 parent 6a3a33f commit fe9b603
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 105 deletions.
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::connection::{
resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo,
Msg, RedisConnectionInfo,
};
#[cfg(any(feature = "tokio-comp"))]
#[cfg(feature = "tokio-comp")]
use crate::parser::ValueCodec;
use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value};
use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs};
Expand Down
36 changes: 21 additions & 15 deletions glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
&mut self,
item: SinkItem,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
self.send_recv(item, None, timeout).await
}

Expand All @@ -359,7 +359,7 @@ where
// If `None`, this is a single request, not a pipeline of multiple requests.
pipeline_response_count: Option<usize>,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -369,15 +369,27 @@ where
output: sender,
})
.await
.map_err(|_| None)?;
.map_err(|err| {
// If an error occurs here, it means the request never reached the server, as guaranteed
// by the 'send' function. Since the server did not receive the data, it is safe to retry
// the request.
RedisError::from((
crate::ErrorKind::IoErrorRetrySafe,
"Failed to send the request to the server",
format!("{err}"),
))
})?;
match Runtime::locate().timeout(timeout, receiver).await {
Ok(Ok(result)) => result.map_err(Some),
Ok(Ok(result)) => result,
Ok(Err(_)) => {
// The `sender` was dropped which likely means that the stream part
// failed for one reason or another
Err(None)
// failed for one reason or another.
// Since we don't know if the server received the request, retrying it isn't safe.
// For example, retrying an INCR request could result in double increments.
// Hence, we return an IoError instead of an IoErrorRetrySafe.
Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
}
Err(elapsed) => Err(Some(elapsed.into())),
Err(elapsed) => Err(elapsed.into()),
}
}

Expand Down Expand Up @@ -503,10 +515,7 @@ impl MultiplexedConnection {
let result = self
.pipeline
.send_single(cmd.get_packed_command(), self.response_timeout)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;
if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
if e.is_connection_dropped() {
Expand Down Expand Up @@ -537,10 +546,7 @@ impl MultiplexedConnection {
Some(offset + count),
self.response_timeout,
)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;

if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
Expand Down
3 changes: 2 additions & 1 deletion glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ where
.wait_time_for_retry(retries);
thread::sleep(sleep_time);
}
crate::types::RetryMethod::Reconnect => {
crate::types::RetryMethod::Reconnect
| crate::types::RetryMethod::ReconnectAndRetry => {
if *self.auto_reconnect.borrow() {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ where
&self,
amount: usize,
conn_type: ConnectionType,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
) -> Option<impl Iterator<Item = ConnectionAndAddress<Connection>> + '_> {
(!self.connection_map.is_empty()).then_some({
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
})
}

pub(crate) fn replace_or_add_connection_for_address(
Expand Down Expand Up @@ -633,6 +635,7 @@ mod tests {

let random_connections: HashSet<_> = container
.random_connections(3, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();

Expand All @@ -647,12 +650,9 @@ mod tests {
let container = create_container();
remove_all_connections(&container);

assert_eq!(
0,
container
.random_connections(1, ConnectionType::User)
.count()
);
assert!(container
.random_connections(1, ConnectionType::User)
.is_none());
}

#[test]
Expand All @@ -665,6 +665,7 @@ mod tests {
);
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.collect();

assert_eq!(vec![(address, 4)], random_connections);
Expand All @@ -675,6 +676,7 @@ mod tests {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand All @@ -687,6 +689,7 @@ mod tests {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::PreferManagement)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand Down
85 changes: 62 additions & 23 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ impl<C> Future for Request<C> {
let request = this.request.as_mut().unwrap();
// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let retry_method = err.retry_method();
let next = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
Expand All @@ -855,7 +856,9 @@ impl<C> Future for Request<C> {
sleep_duration: None,
}
.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
} else if matches!(retry_method, crate::types::RetryMethod::Reconnect)
|| matches!(retry_method, crate::types::RetryMethod::ReconnectAndRetry)
{
if let OperationTarget::Node { address } = target {
Next::Reconnect {
request: None,
Expand Down Expand Up @@ -934,13 +937,18 @@ impl<C> Future for Request<C> {
});
self.poll(cx)
}
crate::types::RetryMethod::Reconnect => {
crate::types::RetryMethod::Reconnect
| crate::types::RetryMethod::ReconnectAndRetry => {
let mut request = this.request.take().unwrap();
// TODO should we reset the redirect here?
request.info.reset_routing();
warn!("disconnected from {:?}", address);
let should_retry = matches!(
err.retry_method(),
crate::types::RetryMethod::ReconnectAndRetry
);
Next::Reconnect {
request: Some(request),
request: should_retry.then_some(request),
target: address,
}
.into()
Expand Down Expand Up @@ -1177,8 +1185,11 @@ where
Ok(connections.0)
}

fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
let inner = self.inner.clone();
// Reconnet to the initial nodes provided by the user in the creation of the client,
// and try to refresh the slots based on the initial connections.
// Being used when all cluster connections are unavailable.
fn reconnect_to_initial_nodes(inner: Arc<InnerCore<C>>) -> impl Future<Output = ()> {
let inner = inner.clone();
async move {
let connection_map = match Self::create_initial_connections(
&inner.initial_nodes,
Expand Down Expand Up @@ -1680,7 +1691,9 @@ where
Self::refresh_slots_inner(inner, curr_retry)
.await
.map_err(|err| {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES
|| err.kind() == ErrorKind::AllConnectionsUnavailable
{
BackoffError::Permanent(err)
} else {
BackoffError::from(err)
Expand Down Expand Up @@ -2073,14 +2086,22 @@ where
}
ConnectionCheck::RandomConnection => {
let read_guard = core.conn_lock.read().await;
let (random_address, random_conn_future) = read_guard
read_guard
.random_connections(1, ConnectionType::User)
.next()
.ok_or(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
.and_then(|mut random_connections| {
random_connections.next().map(
|(random_address, random_conn_future)| async move {
(random_address, random_conn_future.await)
},
)
})
.ok_or_else(|| {
RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
))
})?
.await
}
};

Expand All @@ -2104,10 +2125,19 @@ where
}
Err(err) => {
trace!("Recover slots failed!");
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
));
let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)))
} else {
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
),
)))
};
self.state = next_state;
Poll::Ready(Err(err))
}
},
Expand Down Expand Up @@ -2226,9 +2256,7 @@ where
}));
}
}
Next::Reconnect {
request, target, ..
} => {
Next::Reconnect { request, target } => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target]));
if let Some(request) = request {
Expand Down Expand Up @@ -2371,7 +2399,7 @@ where
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.reconnect_to_initial_nodes(),
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)));
}
}
Expand Down Expand Up @@ -2413,8 +2441,19 @@ async fn calculate_topology_from_random_nodes<'a, C>(
where
C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
{
let requested_nodes =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement);
let requested_nodes = if let Some(random_conns) =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement)
{
random_conns
} else {
return (
Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No available connections to refresh slots from",
))),
vec![],
);
};
let topology_join_results =
futures::future::join_all(requested_nodes.map(|(addr, conn)| async move {
let mut conn: C = conn.await;
Expand Down
7 changes: 7 additions & 0 deletions glide-core/redis-rs/redis/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub enum ErrorKind {
/// not native to the system. This is usually the case if
/// the cause is another error.
IoError,
/// An I/O error that is considered safe to retry as the request was not received by the server
IoErrorRetrySafe,
/// An error raised that was identified on the client before execution.
ClientError,
/// An extension error. This is an error created by the server
Expand Down Expand Up @@ -802,6 +804,7 @@ impl fmt::Debug for RedisError {

pub(crate) enum RetryMethod {
Reconnect,
ReconnectAndRetry,
NoRetry,
RetryImmediately,
WaitAndRetry,
Expand Down Expand Up @@ -870,6 +873,7 @@ impl RedisError {
ErrorKind::CrossSlot => "cross-slot",
ErrorKind::MasterDown => "master down",
ErrorKind::IoError => "I/O error",
ErrorKind::IoErrorRetrySafe => "I/O error - Request wasn't received by the server",
ErrorKind::ExtensionError => "extension error",
ErrorKind::ClientError => "client error",
ErrorKind::ReadOnly => "read-only",
Expand Down Expand Up @@ -957,6 +961,7 @@ impl RedisError {
pub fn is_unrecoverable_error(&self) -> bool {
match self.retry_method() {
RetryMethod::Reconnect => true,
RetryMethod::ReconnectAndRetry => true,

RetryMethod::NoRetry => false,
RetryMethod::RetryImmediately => false,
Expand Down Expand Up @@ -1064,12 +1069,14 @@ impl RedisError {

io::ErrorKind::PermissionDenied => RetryMethod::NoRetry,
io::ErrorKind::Unsupported => RetryMethod::NoRetry,
io::ErrorKind::TimedOut => RetryMethod::NoRetry,

_ => RetryMethod::RetryImmediately,
},
_ => RetryMethod::RetryImmediately,
},
ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry,
ErrorKind::IoErrorRetrySafe => RetryMethod::ReconnectAndRetry,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/tests/test_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ mod basic_async {
Err(err) => break err,
}
};
assert_eq!(err.kind(), ErrorKind::IoError); // Shouldn't this be IoError?
assert_eq!(err.kind(), ErrorKind::IoErrorRetrySafe);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit fe9b603

Please sign in to comment.