Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid retrying on IO errors when it’s unclear if the server received the request #2479

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::connection::{
resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo,
Msg, RedisConnectionInfo,
};
#[cfg(any(feature = "tokio-comp"))]
#[cfg(feature = "tokio-comp")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch 👍🏽

use crate::parser::ValueCodec;
use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value};
use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs};
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ macro_rules! reconnect_if_dropped {
macro_rules! reconnect_if_io_error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: macro should be renamed to reflect the change: e.g. reconnect_if_conn_dropped

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll change it, but in a different PR we need to remove this whole file - we have no use in redis-rs's connection_manager

($self:expr, $result:expr, $current:expr) => {
if let Err(e) = $result {
if e.is_io_error() {
if e.is_connection_dropped() {
$self.reconnect($current);
}
return Err(e);
Expand Down
42 changes: 25 additions & 17 deletions glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
&mut self,
item: SinkItem,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
self.send_recv(item, None, timeout).await
}

Expand All @@ -359,7 +359,7 @@ where
// If `None`, this is a single request, not a pipeline of multiple requests.
pipeline_response_count: Option<usize>,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -369,15 +369,29 @@ where
output: sender,
})
.await
.map_err(|_| None)?;
.map_err(|err| {
// If an error occurs here, it means the request never reached the server, as guaranteed
// by the 'send' function. Since the server did not receive the data, it is safe to retry
// the request.
RedisError::from((
crate::ErrorKind::FatalSendError,
"Failed to send the request to the server",
err.to_string(),
))
})?;
match Runtime::locate().timeout(timeout, receiver).await {
Ok(Ok(result)) => result.map_err(Some),
Ok(Err(_)) => {
// The `sender` was dropped which likely means that the stream part
// failed for one reason or another
Err(None)
Ok(Ok(result)) => result,
Ok(Err(err)) => {
// The `sender` was dropped, likely indicating a failure in the stream.
// This error suggests that it's unclear whether the server received the request before the connection failed,
// making it unsafe to retry. For example, retrying an INCR request could result in double increments.
Err(RedisError::from((
crate::ErrorKind::FatalReceiveError,
"Failed to receive a response due to a fatal error",
err.to_string(),
)))
}
Err(elapsed) => Err(Some(elapsed.into())),
Err(elapsed) => Err(elapsed.into()),
}
}

Expand Down Expand Up @@ -503,10 +517,7 @@ impl MultiplexedConnection {
let result = self
.pipeline
.send_single(cmd.get_packed_command(), self.response_timeout)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;
if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
if e.is_connection_dropped() {
Expand Down Expand Up @@ -537,10 +548,7 @@ impl MultiplexedConnection {
Some(offset + count),
self.response_timeout,
)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;

if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
Expand Down
21 changes: 10 additions & 11 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use std::time::Duration;

use rand::{seq::IteratorRandom, thread_rng};

pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
use crate::cluster_pipeline::UNROUTABLE_ERROR;
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};
use crate::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo,
};
Expand All @@ -54,17 +56,14 @@ use crate::connection::{
connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo,
};
use crate::parser::parse_redis_value;
use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, Value};
use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, RetryMethod, Value};
pub use crate::TlsMode; // Pub for backwards compatibility
use crate::{
cluster_client::ClusterParams,
cluster_routing::{Redirect, Route, RoutingInfo},
IntoConnectionInfo, PushInfo,
};

pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};

use tokio::sync::mpsc;

#[cfg(feature = "tls-rustls")]
Expand Down Expand Up @@ -749,29 +748,29 @@ where
retries += 1;

match err.retry_method() {
crate::types::RetryMethod::AskRedirect => {
RetryMethod::AskRedirect => {
redirected = err
.redirect_node()
.map(|(node, _slot)| Redirect::Ask(node.to_string()));
}
crate::types::RetryMethod::MovedRedirect => {
RetryMethod::MovedRedirect => {
// Refresh slots.
self.refresh_slots()?;
// Request again.
redirected = err
.redirect_node()
.map(|(node, _slot)| Redirect::Moved(node.to_string()));
}
crate::types::RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica
| crate::types::RetryMethod::WaitAndRetry => {
RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica
| RetryMethod::WaitAndRetry => {
// Sleep and retry.
let sleep_time = self
.cluster_params
.retry_params
.wait_time_for_retry(retries);
thread::sleep(sleep_time);
}
crate::types::RetryMethod::Reconnect => {
RetryMethod::Reconnect | RetryMethod::ReconnectAndRetry => {
if *self.auto_reconnect.borrow() {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
Expand All @@ -780,10 +779,10 @@ where
}
}
}
crate::types::RetryMethod::NoRetry => {
RetryMethod::NoRetry => {
return Err(err);
}
crate::types::RetryMethod::RetryImmediately => {}
RetryMethod::RetryImmediately => {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ where
&self,
amount: usize,
conn_type: ConnectionType,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
) -> Option<impl Iterator<Item = ConnectionAndAddress<Connection>> + '_> {
(!self.connection_map.is_empty()).then_some({
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
})
}

pub(crate) fn replace_or_add_connection_for_address(
Expand Down Expand Up @@ -633,6 +635,7 @@ mod tests {

let random_connections: HashSet<_> = container
.random_connections(3, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();

Expand All @@ -647,12 +650,9 @@ mod tests {
let container = create_container();
remove_all_connections(&container);

assert_eq!(
0,
container
.random_connections(1, ConnectionType::User)
.count()
);
assert!(container
.random_connections(1, ConnectionType::User)
.is_none());
}

#[test]
Expand All @@ -665,6 +665,7 @@ mod tests {
);
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.collect();

assert_eq!(vec![(address, 4)], random_connections);
Expand All @@ -675,6 +676,7 @@ mod tests {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand All @@ -687,6 +689,7 @@ mod tests {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::PreferManagement)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand Down
Loading
Loading